diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarOperations.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarOperations.java index 3751485a..ac85e0df 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarOperations.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarOperations.java @@ -111,49 +111,43 @@ public interface ReactivePulsarOperations { Flux send(@Nullable String topic, Publisher messages, @Nullable Schema schema); /** - * Create a {@link SendMessageBuilder builder} for configuring and sending a message - * reactively. + * Create a {@link SendOneMessageBuilder builder} for configuring and sending a + * message reactively. * @param message the payload of the message * @return the builder to configure and send the message */ - SendMessageBuilder newMessage(T message); + SendOneMessageBuilder newMessage(T message); /** - * Create a {@link SendMessageBuilder builder} for configuring and sending multiple - * messages reactively. + * Create a {@link SendManyMessageBuilder builder} for configuring and sending + * multiple messages reactively. * @param messages the messages to send * @return the builder to configure and send the message */ - SendMessageBuilder newMessages(Publisher messages); + SendManyMessageBuilder newMessages(Publisher messages); /** * Builder that can be used to configure and send a message. Provides more options * than the send methods provided by {@link ReactivePulsarOperations}. * + * @param the builder type * @param the message payload type */ - interface SendMessageBuilder { + sealed interface SendMessageBuilder permits SendOneMessageBuilder, SendManyMessageBuilder { /** * Specify the topic to send the message to. * @param topic the destination topic * @return the current builder with the destination topic specified */ - SendMessageBuilder withTopic(String topic); + O withTopic(String topic); /** * Specify the schema to use when sending the message. * @param schema the schema to use * @return the current builder with the schema specified */ - SendMessageBuilder withSchema(Schema schema); - - /** - * Specifies the message customizer to use to further configure the message. - * @param customizer the message customizer - * @return the current builder with the message customizer specified - */ - SendMessageBuilder withMessageCustomizer(MessageSpecBuilderCustomizer customizer); + O withSchema(Schema schema); /** * Specifies the customizer to use to further configure the reactive sender @@ -162,7 +156,18 @@ public interface ReactivePulsarOperations { * @return the current builder with the reactive sender builder customizer * specified */ - SendMessageBuilder withSenderCustomizer(ReactiveMessageSenderBuilderCustomizer customizer); + O withSenderCustomizer(ReactiveMessageSenderBuilderCustomizer customizer); + + } + + non-sealed interface SendOneMessageBuilder extends SendMessageBuilder, T> { + + /** + * Specifies the message customizer to use to further configure the message. + * @param customizer the message customizer + * @return the current builder with the message customizer specified + */ + SendOneMessageBuilder withMessageCustomizer(MessageSpecBuilderCustomizer customizer); /** * Send the message in a reactive manner using the configured specification. @@ -170,12 +175,16 @@ public interface ReactivePulsarOperations { */ Mono send(); + } + + non-sealed interface SendManyMessageBuilder extends SendMessageBuilder, T> { + /** * Send the messages in a reactive manner using the configured specification. * @return the ids assigned by the broker to the published messages in the same * order as they were sent */ - Flux sendMany(); + Flux send(); } diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplate.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplate.java index 29b7e3ec..e7f58e1d 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplate.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplate.java @@ -101,27 +101,27 @@ public class ReactivePulsarTemplate implements ReactivePulsarOperations { @Override public Flux send(Publisher messages, @Nullable Schema schema) { - return doSendMany(null, Flux.from(messages), schema); + return doSendMany(null, Flux.from(messages), schema, null); } @Override public Flux send(@Nullable String topic, Publisher messages) { - return doSendMany(topic, Flux.from(messages), null); + return doSendMany(topic, Flux.from(messages), null, null); } @Override public Flux send(@Nullable String topic, Publisher messages, @Nullable Schema schema) { - return doSendMany(topic, Flux.from(messages), schema); + return doSendMany(topic, Flux.from(messages), schema, null); } @Override - public SendMessageBuilderImpl newMessage(T message) { - return new SendMessageBuilderImpl<>(this, message); + public SendOneMessageBuilder newMessage(T message) { + return new SendOneMessageBuilderImpl<>(this, message); } @Override - public SendMessageBuilder newMessages(Publisher messages) { - return new SendMessageBuilderImpl<>(this, messages); + public SendManyMessageBuilder newMessages(Publisher messages) { + return new SendManyMessageBuilderImpl<>(this, messages); } private Mono doSend(@Nullable String topic, T message, @Nullable Schema schema, @@ -137,12 +137,13 @@ public class ReactivePulsarTemplate implements ReactivePulsarOperations { // @formatter:on } - private Flux doSendMany(@Nullable String topic, Flux messages, @Nullable Schema schema) { + private Flux doSendMany(@Nullable String topic, Flux messages, @Nullable Schema schema, + @Nullable ReactiveMessageSenderBuilderCustomizer customizer) { return messages.switchOnFirst((firstSignal, messageFlux) -> { T firstMessage = firstSignal.get(); if (firstMessage != null && firstSignal.isOnNext()) { String topicName = resolveTopic(topic, firstMessage.getClass()); - ReactiveMessageSender sender = createMessageSender(topicName, firstMessage, schema, null); + ReactiveMessageSender sender = createMessageSender(topicName, firstMessage, schema, customizer); return messageFlux.map(MessageSpec::of).as(sender::sendMany) .doOnError(ex -> this.logger.error(ex, () -> String.format("Failed to send messages to '%s' topic", topicName))) @@ -185,68 +186,83 @@ public class ReactivePulsarTemplate implements ReactivePulsarOperations { return this.reactiveMessageSenderFactory.createSender(resolvedSchema, topic, customizer); } - public static class SendMessageBuilderImpl implements SendMessageBuilder { + private static class SendMessageBuilderImpl { - private final ReactivePulsarTemplate template; + protected final ReactivePulsarTemplate template; + + @Nullable + protected String topic; + + @Nullable + protected Schema schema; + + @Nullable + protected ReactiveMessageSenderBuilderCustomizer senderCustomizer; + + SendMessageBuilderImpl(ReactivePulsarTemplate template) { + this.template = template; + } + + @SuppressWarnings("unchecked") + public O withTopic(String topic) { + this.topic = topic; + return (O) this; + } + + @SuppressWarnings("unchecked") + public O withSchema(Schema schema) { + this.schema = schema; + return (O) this; + } + + @SuppressWarnings("unchecked") + public O withSenderCustomizer(ReactiveMessageSenderBuilderCustomizer senderCustomizer) { + this.senderCustomizer = senderCustomizer; + return (O) this; + } + + } + + private static final class SendOneMessageBuilderImpl + extends SendMessageBuilderImpl, T> implements SendOneMessageBuilder { private final T message; - private final Publisher messages; - - private String topic; - - private Schema schema; - + @Nullable private MessageSpecBuilderCustomizer messageCustomizer; - private ReactiveMessageSenderBuilderCustomizer senderCustomizer; - - SendMessageBuilderImpl(ReactivePulsarTemplate template, T message) { - this.template = template; + SendOneMessageBuilderImpl(ReactivePulsarTemplate template, T message) { + super(template); this.message = message; - this.messages = null; - } - - SendMessageBuilderImpl(ReactivePulsarTemplate template, Publisher messages) { - this.template = template; - this.message = null; - this.messages = messages; } @Override - public SendMessageBuilderImpl withTopic(String topic) { - this.topic = topic; - return this; - } - - @Override - public SendMessageBuilderImpl withSchema(Schema schema) { - this.schema = schema; - return this; - } - - @Override - public SendMessageBuilderImpl withMessageCustomizer(MessageSpecBuilderCustomizer messageCustomizer) { + public SendOneMessageBuilderImpl withMessageCustomizer(MessageSpecBuilderCustomizer messageCustomizer) { this.messageCustomizer = messageCustomizer; return this; } - @Override - public SendMessageBuilderImpl withSenderCustomizer( - ReactiveMessageSenderBuilderCustomizer senderCustomizer) { - this.senderCustomizer = senderCustomizer; - return this; - } - @Override public Mono send() { return this.template.doSend(this.topic, this.message, this.schema, this.messageCustomizer, this.senderCustomizer); } + } + + private static final class SendManyMessageBuilderImpl + extends SendMessageBuilderImpl, T> implements SendManyMessageBuilder { + + private final Publisher messages; + + SendManyMessageBuilderImpl(ReactivePulsarTemplate template, Publisher messages) { + super(template); + this.messages = messages; + } + @Override - public Flux sendMany() { - return this.template.doSendMany(this.topic, Flux.from(this.messages), this.schema); + public Flux send() { + return this.template.doSendMany(this.topic, Flux.from(this.messages), this.schema, this.senderCustomizer); } } diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java index 8588331a..6a3ddd4b 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java @@ -81,7 +81,7 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport { pulsarTemplate.send(Flux.fromIterable(foos), Schema.JSON(Foo.class)).subscribe(); } else { - pulsarTemplate.newMessages(Flux.fromIterable(foos)).withSchema(Schema.JSON(Foo.class)).sendMany() + pulsarTemplate.newMessages(Flux.fromIterable(foos)).withSchema(Schema.JSON(Foo.class)).send() .subscribe(); } @@ -121,7 +121,7 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport { pulsarTemplate.send(Flux.fromIterable(foos)).subscribe(); } else { - pulsarTemplate.newMessages(Flux.fromIterable(foos)).sendMany().subscribe(); + pulsarTemplate.newMessages(Flux.fromIterable(foos)).send().subscribe(); } // TODO figure out if expected to not be ordered when schema not set on @@ -180,11 +180,10 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport { .send(Flux.fromIterable(data), Schema.STRING).subscribe()), arguments("fluentApiNoSchema", (BiConsumer, ReactivePulsarTemplate>) (data, template) -> template - .newMessages(Flux.fromIterable(data)).sendMany().subscribe()), + .newMessages(Flux.fromIterable(data)).send().subscribe()), arguments("fluentApiWithSchema", (BiConsumer, ReactivePulsarTemplate>) (data, template) -> template - .newMessages(Flux.fromIterable(data)).withSchema(Schema.STRING).sendMany() - .subscribe())); + .newMessages(Flux.fromIterable(data)).withSchema(Schema.STRING).send().subscribe())); } @ParameterizedTest(name = "{0}") @@ -230,7 +229,7 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport { } } else { - ReactivePulsarTemplate.SendMessageBuilderImpl messageBuilder = pulsarTemplate + ReactivePulsarTemplate.SendOneMessageBuilder messageBuilder = pulsarTemplate .newMessage(msgPayload); if (testArgs.explicitTopic) { messageBuilder = messageBuilder.withTopic(topic);