diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaProducerProperties.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaProducerProperties.java index 54ca36713..a64d08dda 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaProducerProperties.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaProducerProperties.java @@ -90,7 +90,8 @@ public class KafkaProducerProperties { /** * The bean name of a MessageChannel to which successful send results should be sent; - * the bean must exist in the application context. + * the bean must exist in the application context. When using the reactive binder, + * the channel must be of type FluxMessageChannel. */ private String recordMetadataChannel; diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinder.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinder.java index 0a0c1ff68..371ac967f 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinder.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinder.java @@ -21,16 +21,13 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import reactor.core.publisher.Flux; -import reactor.core.publisher.Sinks; import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverOptions; import reactor.kafka.receiver.ReceiverRecord; @@ -56,6 +53,9 @@ import org.springframework.cloud.stream.binder.kafka.utils.BindingUtils; import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.context.Lifecycle; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.channel.FluxMessageChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.handler.AbstractMessageHandler; @@ -65,6 +65,7 @@ import org.springframework.kafka.support.converter.KafkaMessageHeaders; import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @@ -169,7 +170,13 @@ public class ReactorKafkaBinder SenderOptions.create(configs)); // TODO bean for converter; MCB doesn't use one on the producer side. RecordMessageConverter converter = new MessagingMessageConverter(); - return new ReactorMessageHandler(opts, converter, destination.getName()); + AbstractApplicationContext applicationContext = getApplicationContext(); + FluxMessageChannel resultChannel = null; + String channelName = producerProperties.getExtension().getRecordMetadataChannel(); + if (channelName != null && applicationContext.containsBean(channelName)) { + resultChannel = applicationContext.getBean(channelName, FluxMessageChannel.class); + } + return new ReactorMessageHandler(opts, converter, destination.getName(), resultChannel); } @@ -316,35 +323,40 @@ public class ReactorKafkaBinder private final SenderOptions senderOptions; + @Nullable + private final FluxMessageChannel results; + private volatile KafkaSender sender; private volatile boolean running; ReactorMessageHandler(SenderOptions opts, RecordMessageConverter converter, - String topic) { + String topic, @Nullable FluxMessageChannel results) { this.senderOptions = opts; this.converter = converter; this.topic = topic; + this.results = results; } @Override protected void handleMessageInternal(Message message) { - Object sendResultHeader = message.getHeaders().get("sendResult"); - Sinks.One sink = Sinks.one(); - if (sendResultHeader instanceof AtomicReference result) { - result.set(sink.asMono()); - } if (this.sender != null) { - UUID uuid = UUID.randomUUID(); + Object correlation = message.getHeaders().get(IntegrationMessageHeaderAccessor.CORRELATION_ID); + if (correlation == null) { + correlation = UUID.randomUUID(); + } @SuppressWarnings("unchecked") - SenderRecord sr = SenderRecord.create( - (ProducerRecord) converter.fromMessage(message, topic), uuid); - Flux> result = sender.send(Flux.just(sr)); - result.subscribe(res -> sink.emitValue(res.recordMetadata(), null)); - } - else { - sink.emitError(new IllegalStateException("Handler is not running"), null); + SenderRecord sr = SenderRecord.create( + (ProducerRecord) converter.fromMessage(message, topic), correlation); + Flux> result = sender.send(Flux.just(sr)); + result.subscribe(res -> { + if (this.results != null) { + this.results.send(MessageBuilder.withPayload(res) + .copyHeaders(message.getHeaders()) + .build()); + } + }); } } diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderIntegrationTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderIntegrationTests.java index f94454850..79c7bb4b8 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderIntegrationTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderIntegrationTests.java @@ -34,17 +34,19 @@ import org.junit.jupiter.params.provider.ValueSource; import reactor.core.publisher.Flux; import reactor.kafka.receiver.ReceiverOptions; import reactor.kafka.receiver.ReceiverRecord; +import reactor.kafka.sender.SenderResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.FluxMessageChannel; import org.springframework.integration.support.MessageBuilder; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; @@ -107,34 +109,34 @@ class ReactorKafkaBinderIntegrationTests { "--spring.cloud.stream.bindings.lowercase-out-0.destination=lowercased-words", "--spring.cloud.stream.bindings.patternConsumer-in-0.group=grp3", "--spring.cloud.stream.bindings.patternConsumer-in-0.destination=.*-words", + "--spring.cloud.stream.kafka.bindings.words1.producer.record-metadata-channel=sendResults", + "--spring.cloud.stream.kafka.bindings.words2.producer.record-metadata-channel=sendResults", "--spring.cloud.stream.kafka.bindings.patternConsumer-in-0.consumer.destination-is-pattern=true", "--spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRR", "--spring.cloud.stream.kafka.binder.brokers=" + embeddedKafka.getBrokersAsString(), excludeKafkaAutoConfigParam(excludeKafkaAutoConfig))) { - Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); - DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); - try { - KafkaTemplate template = new KafkaTemplate<>(pf, true); - template.send("words1", "foobar"); - template.send("words2", "BAZQUX"); + StreamBridge streamBridge = context.getBean(StreamBridge.class); + streamBridge.send("words1", MessageBuilder.withPayload("foobar") + .setCorrelationId(42) + .build()); + streamBridge.send("words2", MessageBuilder.withPayload("BAZQUX") + .setCorrelationId(43) + .build()); - assertThat(KafkaTestUtils.getSingleRecord(consumer1, "uppercased-words")) - .isNotNull() - .extracting(ConsumerRecord::value) - .isEqualTo("FOOBAR"); + assertThat(KafkaTestUtils.getSingleRecord(consumer1, "uppercased-words")) + .isNotNull() + .extracting(ConsumerRecord::value) + .isEqualTo("FOOBAR"); - assertThat(KafkaTestUtils.getSingleRecord(consumer2, "lowercased-words")) - .isNotNull() - .extracting(ConsumerRecord::value) - .isEqualTo("bazqux"); + assertThat(KafkaTestUtils.getSingleRecord(consumer2, "lowercased-words")) + .isNotNull() + .extracting(ConsumerRecord::value) + .isEqualTo("bazqux"); - assertThat(recOptsCustOrder).containsExactly("two", "one", "two", "one", "two", "one"); - await().untilAsserted(() -> assertThat(patternedDeliveries).contains("bazqux", "FOOBAR")); - } - finally { - pf.destroy(); - } + assertThat(recOptsCustOrder).containsExactly("two", "one", "two", "one", "two", "one"); + await().untilAsserted(() -> assertThat(patternedDeliveries).contains("bazqux", "FOOBAR")); + assertThat(context.getBean(ReactiveKafkaApplication.class).correlation).contains(42, 43); } } @@ -146,6 +148,8 @@ class ReactorKafkaBinderIntegrationTests { @EnableAutoConfiguration public static class ReactiveKafkaApplication { + final List correlation = Collections.synchronizedList(new ArrayList<>()); + @Bean RecordMessageConverter fullRR() { return new RecordMessageConverter() { @@ -210,6 +214,29 @@ class ReactorKafkaBinderIntegrationTests { }; } + @Bean + FluxMessageChannel sendResults() { + return new FluxMessageChannel(); + } + + @ServiceActivator(inputChannel = "sendResults") + void handleResults(SenderResult result) { + if (result.exception() != null) { + failureFor(result); + } + else { + successFor(result); + } + } + + private void failureFor(SenderResult result) { + this.correlation.clear(); + } + + private boolean successFor(SenderResult result) { + return this.correlation.add(result.correlationMetadata()); + } + } } diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderTests.java index cdc2cdf19..919691745 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderTests.java @@ -34,6 +34,7 @@ import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.kafka.receiver.ReceiverOffset; +import reactor.kafka.sender.SenderResult; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; @@ -45,7 +46,9 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerPro import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties; import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner; import org.springframework.context.support.GenericApplicationContext; +import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.channel.FluxMessageChannel; +import org.springframework.integration.config.ConsumerEndpointFactoryBean; import org.springframework.integration.support.MessageBuilder; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; @@ -55,6 +58,8 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessagingException; import org.springframework.retry.support.RetryTemplate; import static org.assertj.core.api.Assertions.assertThat; @@ -296,7 +301,28 @@ public class ReactorKafkaBinderTests { }); provisioner.setMetadataRetryOperations(new RetryTemplate()); ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner); - binder.setApplicationContext(new GenericApplicationContext()); + CountDownLatch latch = new CountDownLatch(1); + GenericApplicationContext context = new GenericApplicationContext(); + context.registerBean("sendResults", FluxMessageChannel.class); + context.refresh(); + FluxMessageChannel results = context.getBean("sendResults", FluxMessageChannel.class); + ConsumerEndpointFactoryBean fb = new ConsumerEndpointFactoryBean(); + AtomicReference> senderResult = new AtomicReference<>(); + fb.setHandler(new MessageHandler() { + + @SuppressWarnings("unchecked") + @Override + public void handleMessage(Message message) throws MessagingException { + senderResult.set((SenderResult) message.getPayload()); + latch.countDown(); + } + + }); + fb.setInputChannel(results); + fb.setBeanFactory(context.getBeanFactory()); + fb.afterPropertiesSet(); + fb.start(); + binder.setApplicationContext(context); @SuppressWarnings("rawtypes") ObjectProvider cust = mock(ObjectProvider.class); AtomicBoolean custCalled = new AtomicBoolean(); @@ -310,17 +336,15 @@ public class ReactorKafkaBinderTests { KafkaProducerProperties ext = new KafkaProducerProperties(); ExtendedProducerProperties props = new ExtendedProducerProperties(ext); + props.getExtension().setRecordMetadataChannel("sendResults"); Binding bindProducer = binder.bindProducer("testP", outbound, props); AtomicReference> sendResult = new AtomicReference<>(); outbound.send(MessageBuilder.withPayload("foo") - .setHeader("sendResult", sendResult) + .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, 1) .build()); - CountDownLatch latch = new CountDownLatch(1); - sendResult.get().doOnNext(rmd -> { - latch.countDown(); - }).subscribe(); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(senderResult.get().correlationMetadata()).isEqualTo(1); bindProducer.unbind(); assertThat(custCalled).isTrue(); } diff --git a/docs/src/main/asciidoc/kafka/kafka-reactive.adoc b/docs/src/main/asciidoc/kafka/kafka-reactive.adoc index 76533a71c..f85fece0d 100644 --- a/docs/src/main/asciidoc/kafka/kafka-reactive.adoc +++ b/docs/src/main/asciidoc/kafka/kafka-reactive.adoc @@ -182,3 +182,48 @@ When `false` (default), a separate binding is created for each topic specified i Starting with version 4.0.3, the `destination-is-pattern` Kafka binding consumer property is now supported. The receiver options are conigured with a regex `Pattern`, allowing the binding to consume from any topic that matches the pattern. + +=== Sender Result Channel + +Starting with version 4.0.3, you can configure the `resultMetadataChannel` to receive `SenderResult` s to determine success/failure of sends. + +The `SenderResult` contains `correlationMetadata` to allow you to correlate results with sends; it also contains `RecordMetadata`, which indicates the `TopicPartition` and offset of the sent record. + +The `resultMetadataChannel` **must** be a `FluxMessageChannel` instance. + +Here is an example of how to use this feature, with correlation metadata of type `Integer`: + +==== +[source, java] +---- +@Bean +FluxMessageChannel sendResults() { + return new FluxMessageChannel(); +} + +@ServiceActivator(inputChannel = "sendResults") +void handleResults(SenderResult result) { + if (result.exception() != null) { + failureFor(result); + } + else { + successFor(result); + } +} +---- +==== + +To set the correlation metadata on an output record, set the `CORRELATION_ID` header: + +==== +[source, java] +---- +streamBridge.send("words1", MessageBuilder.withPayload("foobar") + .setCorrelationId(42) + .build()); +---- +==== + +When using the feature with a `Function`, the function output type must be a `Message` with the correlation id header set to the desired value. + +Metadata should be unique, at least for the duration of the send.