diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactory.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactory.java index ac7ff5c4..b27fe22e 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactory.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactory.java @@ -49,18 +49,19 @@ public class DefaultReactivePulsarSenderFactory implements ReactivePulsarSend private final ReactiveMessageSenderSpec reactiveMessageSenderSpec; + @Nullable private final ReactiveMessageSenderCache reactiveMessageSenderCache; public DefaultReactivePulsarSenderFactory(PulsarClient pulsarClient, - ReactiveMessageSenderSpec reactiveMessageSenderSpec, - ReactiveMessageSenderCache reactiveMessageSenderCache) { + @Nullable ReactiveMessageSenderSpec reactiveMessageSenderSpec, + @Nullable ReactiveMessageSenderCache reactiveMessageSenderCache) { this(AdaptedReactivePulsarClientFactory.create(pulsarClient), reactiveMessageSenderSpec, reactiveMessageSenderCache); } public DefaultReactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient, - ReactiveMessageSenderSpec reactiveMessageSenderSpec, - ReactiveMessageSenderCache reactiveMessageSenderCache) { + @Nullable ReactiveMessageSenderSpec reactiveMessageSenderSpec, + @Nullable ReactiveMessageSenderCache reactiveMessageSenderCache) { this.reactivePulsarClient = reactivePulsarClient; this.reactiveMessageSenderSpec = new ImmutableReactiveMessageSenderSpec( reactiveMessageSenderSpec != null ? reactiveMessageSenderSpec : new MutableReactiveMessageSenderSpec()); diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarOperations.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarOperations.java index 5e991836..3751485a 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarOperations.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarOperations.java @@ -126,16 +126,6 @@ public interface ReactivePulsarOperations { */ SendMessageBuilder newMessages(Publisher messages); - /** - * Create a {@link SendMessageBuilder builder} for configuring and sending multiple - * messages reactively. - * @param messages the messages to send - * @param messageType the type of messages being sent - helpful for topic resolution - * when schema is not specified during a {@code sendMany} operation - * @return the builder to configure and send the message - */ - SendMessageBuilder newMessages(Publisher messages, Class messageType); - /** * Builder that can be used to configure and send a message. Provides more options * than the send methods provided by {@link ReactivePulsarOperations}. diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplate.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplate.java index f1c6b757..29b7e3ec 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplate.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplate.java @@ -101,17 +101,17 @@ public class ReactivePulsarTemplate implements ReactivePulsarOperations { @Override public Flux send(Publisher messages, @Nullable Schema schema) { - return doSendMany(null, Flux.from(messages), null, schema); + return doSendMany(null, Flux.from(messages), schema); } @Override public Flux send(@Nullable String topic, Publisher messages) { - return doSendMany(topic, Flux.from(messages), null, null); + return doSendMany(topic, Flux.from(messages), null); } @Override public Flux send(@Nullable String topic, Publisher messages, @Nullable Schema schema) { - return doSendMany(topic, Flux.from(messages), null, schema); + return doSendMany(topic, Flux.from(messages), schema); } @Override @@ -121,12 +121,7 @@ public class ReactivePulsarTemplate implements ReactivePulsarOperations { @Override public SendMessageBuilder newMessages(Publisher messages) { - return new SendMessageBuilderImpl<>(this, messages, null); - } - - @Override - public SendMessageBuilder newMessages(Publisher messages, Class messageType) { - return new SendMessageBuilderImpl<>(this, messages, messageType); + return new SendMessageBuilderImpl<>(this, messages); } private Mono doSend(@Nullable String topic, T message, @Nullable Schema schema, @@ -142,25 +137,21 @@ public class ReactivePulsarTemplate implements ReactivePulsarOperations { // @formatter:on } - private Flux doSendMany(@Nullable String topic, Flux messages, @Nullable Class messageType, - @Nullable Schema schema) { - if (schema != null) { - /* - * If schema specified we can create the message sender right away and use - * ReactiveMessageSender::sendMany to send them as a stream. Otherwise, we - * need to wait to get a message to create the sender, and we can't share it - * between messages. So we create one each time and use - * ReactiveMessageSender::sendOne to send messages individually. - */ - String topicName = resolveTopic(topic, messageType); - ReactiveMessageSender sender = createMessageSender(topicName, null, schema, null); - return messages.map(MessageSpec::of).as(sender::sendMany) - .doOnError(ex -> this.logger.error(ex, - () -> String.format("Failed to send messages to '%s' topic", topicName))) - .doOnNext( - msgId -> this.logger.trace(() -> String.format("Sent messages to '%s' topic", topicName))); - } - return messages.flatMapSequential(message -> doSend(topic, message, null, null, null)); + private Flux doSendMany(@Nullable String topic, Flux messages, @Nullable Schema schema) { + return messages.switchOnFirst((firstSignal, messageFlux) -> { + T firstMessage = firstSignal.get(); + if (firstMessage != null && firstSignal.isOnNext()) { + String topicName = resolveTopic(topic, firstMessage.getClass()); + ReactiveMessageSender sender = createMessageSender(topicName, firstMessage, schema, null); + return messageFlux.map(MessageSpec::of).as(sender::sendMany) + .doOnError(ex -> this.logger.error(ex, + () -> String.format("Failed to send messages to '%s' topic", topicName))) + .doOnNext(msgId -> this.logger + .trace(() -> String.format("Sent messages to '%s' topic", topicName))); + } + // The flux has errored or is completed + return messageFlux.thenMany(Flux.empty()); + }); } private String resolveTopic(@Nullable String topic, @Nullable Class messageType) { @@ -187,10 +178,11 @@ public class ReactivePulsarTemplate implements ReactivePulsarOperations { private ReactiveMessageSender createMessageSender(@Nullable String topic, T message, @Nullable Schema schema, @Nullable ReactiveMessageSenderBuilderCustomizer customizer) { - if (schema == null) { - schema = this.schemaResolver.getSchema(message); + Schema resolvedSchema = schema == null ? this.schemaResolver.getSchema(message) : schema; + if (resolvedSchema == null) { + throw new IllegalArgumentException("Couldn't resolve a schema for the message"); } - return this.reactiveMessageSenderFactory.createSender(schema, topic, customizer); + return this.reactiveMessageSenderFactory.createSender(resolvedSchema, topic, customizer); } public static class SendMessageBuilderImpl implements SendMessageBuilder { @@ -201,9 +193,6 @@ public class ReactivePulsarTemplate implements ReactivePulsarOperations { private final Publisher messages; - @Nullable - private Class messageType; - private String topic; private Schema schema; @@ -216,15 +205,12 @@ public class ReactivePulsarTemplate implements ReactivePulsarOperations { this.template = template; this.message = message; this.messages = null; - this.messageType = null; } - SendMessageBuilderImpl(ReactivePulsarTemplate template, Publisher messages, - @Nullable Class messageType) { + SendMessageBuilderImpl(ReactivePulsarTemplate template, Publisher messages) { this.template = template; this.message = null; this.messages = messages; - this.messageType = messageType; } @Override @@ -260,7 +246,7 @@ public class ReactivePulsarTemplate implements ReactivePulsarOperations { @Override public Flux sendMany() { - return this.template.doSendMany(this.topic, Flux.from(this.messages), this.messageType, this.schema); + return this.template.doSendMany(this.topic, Flux.from(this.messages), this.schema); } } diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java index bb642cc9..8588331a 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java @@ -86,8 +86,8 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport { } for (int i = 0; i < 10; i++) { - assertThat(consumer.receiveAsync()).succeedsWithin(Duration.ofSeconds(3)) - .extracting(Message::getValue).isEqualTo(foos.get(i)); + assertThat(consumer.receiveAsync().thenApply(Message::getValue)) + .succeedsWithin(Duration.ofSeconds(3)).isEqualTo(foos.get(i)); } } } @@ -139,7 +139,7 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport { @ParameterizedTest(name = "{0}") @MethodSource("sendManyWithInferredTopicProvider") - void sendManyWithInferredTopic(String testName, boolean shouldPass, + void sendManyWithInferredTopic(String testName, BiConsumer, ReactivePulsarTemplate> sendHandler) throws Exception { String topic = "rptt-" + testName + "-topic"; String sub = "rptt-" + testName + "-sub"; @@ -163,40 +163,25 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport { sendHandler.accept(Collections.singletonList(theSingleFoo), pulsarTemplate); - if (shouldPass) { - assertThat(consumer.receiveAsync()).succeedsWithin(Duration.ofSeconds(3)) - .extracting(Message::getValue).isEqualTo(theSingleFoo); - } - else { - assertThat(consumer.receive(3, TimeUnit.SECONDS)).isNull(); - } + assertThat(consumer.receiveAsync()).succeedsWithin(Duration.ofSeconds(3)).extracting(Message::getValue) + .isEqualTo(theSingleFoo); + } } } static Stream sendManyWithInferredTopicProvider() { - // NOTE: Topic resolution requires in ReactivePulsarTemplate.sendMany requires - // the messageType to be set when the schema is specified as it creates the - // message sender spec before it subscribes to the publisher of messages (iow - // it does not have access to a message to determine message type in this case return Stream.of( - arguments("simpleApiNoSchemaPasses", true, + arguments("simpleApiNoSchema", (BiConsumer, ReactivePulsarTemplate>) (data, template) -> template .send(Flux.fromIterable(data)).subscribe()), - arguments("simpleApiWithSchemaAlwaysFails", false, + arguments("simpleApiWithSchema", (BiConsumer, ReactivePulsarTemplate>) (data, template) -> template .send(Flux.fromIterable(data), Schema.STRING).subscribe()), - arguments("fluentApiNoSchemaWithMsgTypePasses", true, - (BiConsumer, ReactivePulsarTemplate>) (data, template) -> template - .newMessages(Flux.fromIterable(data), String.class).sendMany().subscribe()), - arguments("fluentApiNoSchemaNoMsgTypePasses", true, + arguments("fluentApiNoSchema", (BiConsumer, ReactivePulsarTemplate>) (data, template) -> template .newMessages(Flux.fromIterable(data)).sendMany().subscribe()), - arguments("fluentApiWithSchemaWithMsgTypePasses", true, - (BiConsumer, ReactivePulsarTemplate>) (data, template) -> template - .newMessages(Flux.fromIterable(data), String.class).withSchema(Schema.STRING).sendMany() - .subscribe()), - arguments("fluentApiWithSchemaNoMsgTypeFails", false, + arguments("fluentApiWithSchema", (BiConsumer, ReactivePulsarTemplate>) (data, template) -> template .newMessages(Flux.fromIterable(data)).withSchema(Schema.STRING).sendMany() .subscribe())); diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainerTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainerTests.java index b494d50a..5c987fb2 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainerTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainerTests.java @@ -302,6 +302,7 @@ class DefaultReactivePulsarMessageListenerContainerTests implements PulsarTestCo container.setConsumerCustomizer(b -> b.deadLetterPolicy(deadLetterPolicy)); container.start(); MutableReactiveMessageSenderSpec prodConfig = new MutableReactiveMessageSenderSpec(); + prodConfig.setBatchingEnabled(false); prodConfig.setTopicName(topic); DefaultReactivePulsarSenderFactory pulsarProducerFactory = new DefaultReactivePulsarSenderFactory<>( reactivePulsarClient, prodConfig, null);