GH-2729: Reactor Kafka Binder SenderResult Support (#2730)

* GH-2729: Reactor Kafka Binder SenderResult Support

Resolves https://github.com/spring-cloud/spring-cloud-stream/issues/2729

Allow configuration of a `FluxMessageChannel` to receive `SenderResult`s.
Add `SenderResultMessageHandler` to consume from that channel.

Remove undocumented `sendResult` header, which has no value without the
sender result correlation metadata.

* Add integration test and polish property docs.

- result channel must be FMC for reactive binder.

* Add documentation.

* Remove SenderResultMessageHandler.
This commit is contained in:
Gary Russell
2023-05-17 16:10:20 -04:00
committed by Soby Chacko
parent acb81954f4
commit ee01cc9e50
5 changed files with 156 additions and 47 deletions

View File

@@ -90,7 +90,8 @@ public class KafkaProducerProperties {
/**
* The bean name of a MessageChannel to which successful send results should be sent;
* the bean must exist in the application context.
* the bean must exist in the application context. When using the reactive binder,
* the channel must be of type FluxMessageChannel.
*/
private String recordMetadataChannel;

View File

@@ -21,16 +21,13 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
@@ -56,6 +53,9 @@ import org.springframework.cloud.stream.binder.kafka.utils.BindingUtils;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.context.Lifecycle;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.handler.AbstractMessageHandler;
@@ -65,6 +65,7 @@ import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@@ -169,7 +170,13 @@ public class ReactorKafkaBinder
SenderOptions.create(configs));
// TODO bean for converter; MCB doesn't use one on the producer side.
RecordMessageConverter converter = new MessagingMessageConverter();
return new ReactorMessageHandler(opts, converter, destination.getName());
AbstractApplicationContext applicationContext = getApplicationContext();
FluxMessageChannel resultChannel = null;
String channelName = producerProperties.getExtension().getRecordMetadataChannel();
if (channelName != null && applicationContext.containsBean(channelName)) {
resultChannel = applicationContext.getBean(channelName, FluxMessageChannel.class);
}
return new ReactorMessageHandler(opts, converter, destination.getName(), resultChannel);
}
@@ -316,35 +323,40 @@ public class ReactorKafkaBinder
private final SenderOptions<Object, Object> senderOptions;
@Nullable
private final FluxMessageChannel results;
private volatile KafkaSender<Object, Object> sender;
private volatile boolean running;
ReactorMessageHandler(SenderOptions<Object, Object> opts, RecordMessageConverter converter,
String topic) {
String topic, @Nullable FluxMessageChannel results) {
this.senderOptions = opts;
this.converter = converter;
this.topic = topic;
this.results = results;
}
@Override
protected void handleMessageInternal(Message<?> message) {
Object sendResultHeader = message.getHeaders().get("sendResult");
Sinks.One<RecordMetadata> sink = Sinks.one();
if (sendResultHeader instanceof AtomicReference result) {
result.set(sink.asMono());
}
if (this.sender != null) {
UUID uuid = UUID.randomUUID();
Object correlation = message.getHeaders().get(IntegrationMessageHeaderAccessor.CORRELATION_ID);
if (correlation == null) {
correlation = UUID.randomUUID();
}
@SuppressWarnings("unchecked")
SenderRecord<Object, Object, UUID> sr = SenderRecord.create(
(ProducerRecord<Object, Object>) converter.fromMessage(message, topic), uuid);
Flux<SenderResult<UUID>> result = sender.send(Flux.just(sr));
result.subscribe(res -> sink.emitValue(res.recordMetadata(), null));
}
else {
sink.emitError(new IllegalStateException("Handler is not running"), null);
SenderRecord<Object, Object, Object> sr = SenderRecord.create(
(ProducerRecord<Object, Object>) converter.fromMessage(message, topic), correlation);
Flux<SenderResult<Object>> result = sender.send(Flux.just(sr));
result.subscribe(res -> {
if (this.results != null) {
this.results.send(MessageBuilder.withPayload(res)
.copyHeaders(message.getHeaders())
.build());
}
});
}
}

View File

@@ -34,17 +34,19 @@ import org.junit.jupiter.params.provider.ValueSource;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.SenderResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
@@ -107,34 +109,34 @@ class ReactorKafkaBinderIntegrationTests {
"--spring.cloud.stream.bindings.lowercase-out-0.destination=lowercased-words",
"--spring.cloud.stream.bindings.patternConsumer-in-0.group=grp3",
"--spring.cloud.stream.bindings.patternConsumer-in-0.destination=.*-words",
"--spring.cloud.stream.kafka.bindings.words1.producer.record-metadata-channel=sendResults",
"--spring.cloud.stream.kafka.bindings.words2.producer.record-metadata-channel=sendResults",
"--spring.cloud.stream.kafka.bindings.patternConsumer-in-0.consumer.destination-is-pattern=true",
"--spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRR",
"--spring.cloud.stream.kafka.binder.brokers=" + embeddedKafka.getBrokersAsString(),
excludeKafkaAutoConfigParam(excludeKafkaAutoConfig))) {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.send("words1", "foobar");
template.send("words2", "BAZQUX");
StreamBridge streamBridge = context.getBean(StreamBridge.class);
streamBridge.send("words1", MessageBuilder.withPayload("foobar")
.setCorrelationId(42)
.build());
streamBridge.send("words2", MessageBuilder.withPayload("BAZQUX")
.setCorrelationId(43)
.build());
assertThat(KafkaTestUtils.getSingleRecord(consumer1, "uppercased-words"))
.isNotNull()
.extracting(ConsumerRecord::value)
.isEqualTo("FOOBAR");
assertThat(KafkaTestUtils.getSingleRecord(consumer1, "uppercased-words"))
.isNotNull()
.extracting(ConsumerRecord::value)
.isEqualTo("FOOBAR");
assertThat(KafkaTestUtils.getSingleRecord(consumer2, "lowercased-words"))
.isNotNull()
.extracting(ConsumerRecord::value)
.isEqualTo("bazqux");
assertThat(KafkaTestUtils.getSingleRecord(consumer2, "lowercased-words"))
.isNotNull()
.extracting(ConsumerRecord::value)
.isEqualTo("bazqux");
assertThat(recOptsCustOrder).containsExactly("two", "one", "two", "one", "two", "one");
await().untilAsserted(() -> assertThat(patternedDeliveries).contains("bazqux", "FOOBAR"));
}
finally {
pf.destroy();
}
assertThat(recOptsCustOrder).containsExactly("two", "one", "two", "one", "two", "one");
await().untilAsserted(() -> assertThat(patternedDeliveries).contains("bazqux", "FOOBAR"));
assertThat(context.getBean(ReactiveKafkaApplication.class).correlation).contains(42, 43);
}
}
@@ -146,6 +148,8 @@ class ReactorKafkaBinderIntegrationTests {
@EnableAutoConfiguration
public static class ReactiveKafkaApplication {
final List<Integer> correlation = Collections.synchronizedList(new ArrayList<>());
@Bean
RecordMessageConverter fullRR() {
return new RecordMessageConverter() {
@@ -210,6 +214,29 @@ class ReactorKafkaBinderIntegrationTests {
};
}
@Bean
FluxMessageChannel sendResults() {
return new FluxMessageChannel();
}
@ServiceActivator(inputChannel = "sendResults")
void handleResults(SenderResult<Integer> result) {
if (result.exception() != null) {
failureFor(result);
}
else {
successFor(result);
}
}
private void failureFor(SenderResult<Integer> result) {
this.correlation.clear();
}
private boolean successFor(SenderResult<Integer> result) {
return this.correlation.add(result.correlationMetadata());
}
}
}

View File

@@ -34,6 +34,7 @@ import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.sender.SenderResult;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
@@ -45,7 +46,9 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerPro
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
@@ -55,6 +58,8 @@ import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.retry.support.RetryTemplate;
import static org.assertj.core.api.Assertions.assertThat;
@@ -296,7 +301,28 @@ public class ReactorKafkaBinderTests {
});
provisioner.setMetadataRetryOperations(new RetryTemplate());
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner);
binder.setApplicationContext(new GenericApplicationContext());
CountDownLatch latch = new CountDownLatch(1);
GenericApplicationContext context = new GenericApplicationContext();
context.registerBean("sendResults", FluxMessageChannel.class);
context.refresh();
FluxMessageChannel results = context.getBean("sendResults", FluxMessageChannel.class);
ConsumerEndpointFactoryBean fb = new ConsumerEndpointFactoryBean();
AtomicReference<SenderResult<Integer>> senderResult = new AtomicReference<>();
fb.setHandler(new MessageHandler() {
@SuppressWarnings("unchecked")
@Override
public void handleMessage(Message<?> message) throws MessagingException {
senderResult.set((SenderResult<Integer>) message.getPayload());
latch.countDown();
}
});
fb.setInputChannel(results);
fb.setBeanFactory(context.getBeanFactory());
fb.afterPropertiesSet();
fb.start();
binder.setApplicationContext(context);
@SuppressWarnings("rawtypes")
ObjectProvider<SenderOptionsCustomizer> cust = mock(ObjectProvider.class);
AtomicBoolean custCalled = new AtomicBoolean();
@@ -310,17 +336,15 @@ public class ReactorKafkaBinderTests {
KafkaProducerProperties ext = new KafkaProducerProperties();
ExtendedProducerProperties<KafkaProducerProperties> props =
new ExtendedProducerProperties<KafkaProducerProperties>(ext);
props.getExtension().setRecordMetadataChannel("sendResults");
Binding<MessageChannel> bindProducer = binder.bindProducer("testP", outbound, props);
AtomicReference<Mono<RecordMetadata>> sendResult = new AtomicReference<>();
outbound.send(MessageBuilder.withPayload("foo")
.setHeader("sendResult", sendResult)
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, 1)
.build());
CountDownLatch latch = new CountDownLatch(1);
sendResult.get().doOnNext(rmd -> {
latch.countDown();
}).subscribe();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(senderResult.get().correlationMetadata()).isEqualTo(1);
bindProducer.unbind();
assertThat(custCalled).isTrue();
}

View File

@@ -182,3 +182,48 @@ When `false` (default), a separate binding is created for each topic specified i
Starting with version 4.0.3, the `destination-is-pattern` Kafka binding consumer property is now supported.
The receiver options are conigured with a regex `Pattern`, allowing the binding to consume from any topic that matches the pattern.
=== Sender Result Channel
Starting with version 4.0.3, you can configure the `resultMetadataChannel` to receive `SenderResult<?>` s to determine success/failure of sends.
The `SenderResult` contains `correlationMetadata` to allow you to correlate results with sends; it also contains `RecordMetadata`, which indicates the `TopicPartition` and offset of the sent record.
The `resultMetadataChannel` **must** be a `FluxMessageChannel` instance.
Here is an example of how to use this feature, with correlation metadata of type `Integer`:
====
[source, java]
----
@Bean
FluxMessageChannel sendResults() {
return new FluxMessageChannel();
}
@ServiceActivator(inputChannel = "sendResults")
void handleResults(SenderResult<Integer> result) {
if (result.exception() != null) {
failureFor(result);
}
else {
successFor(result);
}
}
----
====
To set the correlation metadata on an output record, set the `CORRELATION_ID` header:
====
[source, java]
----
streamBridge.send("words1", MessageBuilder.withPayload("foobar")
.setCorrelationId(42)
.build());
----
====
When using the feature with a `Function`, the function output type must be a `Message<?>` with the correlation id header set to the desired value.
Metadata should be unique, at least for the duration of the send.