diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactory.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactory.java index b294ce8b..f2323cee 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactory.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactory.java @@ -18,6 +18,7 @@ package org.springframework.pulsar.reactive.core; import java.util.Collections; import java.util.List; +import java.util.Objects; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; @@ -68,11 +69,6 @@ public class DefaultReactivePulsarSenderFactory implements ReactivePulsarSend this.reactiveMessageSenderCache = reactiveMessageSenderCache; } - @Override - public ReactiveMessageSender createSender(Schema schema) { - return doCreateReactiveMessageSender(schema, null, null); - } - @Override public ReactiveMessageSender createSender(Schema schema, @Nullable String topic) { return doCreateReactiveMessageSender(schema, topic, null); @@ -93,7 +89,7 @@ public class DefaultReactivePulsarSenderFactory implements ReactivePulsarSend private ReactiveMessageSender doCreateReactiveMessageSender(Schema schema, @Nullable String topic, @Nullable List> customizers) { - + Objects.requireNonNull(schema, "Schema must be specified"); String resolvedTopic = ReactiveMessageSenderUtils.resolveTopicName(topic, this); this.logger.trace(() -> "Creating reactive message sender for '%s' topic".formatted(resolvedTopic)); ReactiveMessageSenderBuilder sender = this.reactivePulsarClient.messageSender(schema); diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarSenderFactory.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarSenderFactory.java index 468219f6..6955c7e8 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarSenderFactory.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarSenderFactory.java @@ -33,13 +33,6 @@ import org.springframework.lang.Nullable; */ public interface ReactivePulsarSenderFactory { - /** - * Create a reactive message sender that will send messages to the default topic. - * @param schema the schema of the messages to be sent - * @return the reactive message sender - */ - ReactiveMessageSender createSender(Schema schema); - /** * Create a reactive message sender. * @param topic the topic to send messages to or {@code null} to use the default topic diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactiveMessageSenderFactoryTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactiveMessageSenderFactoryTests.java index 6e093f3e..fbf4fee4 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactiveMessageSenderFactoryTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactiveMessageSenderFactoryTests.java @@ -18,8 +18,10 @@ package org.springframework.pulsar.reactive.core; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.assertj.core.api.Assertions.assertThatNullPointerException; import java.util.Arrays; +import java.util.Collections; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.PulsarClient; @@ -76,26 +78,15 @@ class DefaultReactiveMessageSenderFactoryTests { return new DefaultReactivePulsarSenderFactory<>((PulsarClient) null, null, cache); } - @Nested - class CreateSenderSchemaOnlyApi { - - @Test - void withDefaultTopic() { - var sender = newSenderFactoryWithDefaultTopic("topic0").createSender(schema); - assertThatSenderHasTopic(sender, "topic0"); - } - - @Test - void withoutDefaultTopic() { - assertThatIllegalArgumentException().isThrownBy(() -> newSenderFactory().createSender(schema)) - .withMessageContaining("Topic must be specified when no default topic is configured"); - } - - } - @Nested class CreateSenderSchemaAndTopicApi { + @Test + void withoutSchema() { + assertThatNullPointerException().isThrownBy(() -> newSenderFactory().createSender(null, "topic0")) + .withMessageContaining("Schema must be specified"); + } + @Test void topicSpecifiedWithDefaultTopic() { var sender = newSenderFactoryWithDefaultTopic("topic0").createSender(schema, "topic1"); @@ -132,6 +123,14 @@ class DefaultReactiveMessageSenderFactoryTests { (senderSpec) -> assertThat(senderSpec.getProducerName()).isEqualTo("fooProducer")); } + @Test + void singleCustomizerViaListApi() { + var sender = newSenderFactory().createSender(schema, "topic1", + Collections.singletonList((b) -> b.producerName("fooProducer"))); + assertThatSenderSpecSatisfies(sender, + (senderSpec) -> assertThat(senderSpec.getProducerName()).isEqualTo("fooProducer")); + } + @Test void multipleCustomizers() { var sender = newSenderFactory().createSender(schema, "topic1",