Trim ReactivePulsarSenderFactory API (#320)
* The `createSender(Schema)` is not useful in that most users will specify a topic and not rely on the default topic.
This commit is contained in:
@@ -18,6 +18,7 @@ package org.springframework.pulsar.reactive.core;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.pulsar.client.api.PulsarClient;
|
||||
import org.apache.pulsar.client.api.Schema;
|
||||
@@ -68,11 +69,6 @@ public class DefaultReactivePulsarSenderFactory<T> implements ReactivePulsarSend
|
||||
this.reactiveMessageSenderCache = reactiveMessageSenderCache;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReactiveMessageSender<T> createSender(Schema<T> schema) {
|
||||
return doCreateReactiveMessageSender(schema, null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReactiveMessageSender<T> createSender(Schema<T> schema, @Nullable String topic) {
|
||||
return doCreateReactiveMessageSender(schema, topic, null);
|
||||
@@ -93,7 +89,7 @@ public class DefaultReactivePulsarSenderFactory<T> implements ReactivePulsarSend
|
||||
|
||||
private ReactiveMessageSender<T> doCreateReactiveMessageSender(Schema<T> schema, @Nullable String topic,
|
||||
@Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> customizers) {
|
||||
|
||||
Objects.requireNonNull(schema, "Schema must be specified");
|
||||
String resolvedTopic = ReactiveMessageSenderUtils.resolveTopicName(topic, this);
|
||||
this.logger.trace(() -> "Creating reactive message sender for '%s' topic".formatted(resolvedTopic));
|
||||
ReactiveMessageSenderBuilder<T> sender = this.reactivePulsarClient.messageSender(schema);
|
||||
|
||||
@@ -33,13 +33,6 @@ import org.springframework.lang.Nullable;
|
||||
*/
|
||||
public interface ReactivePulsarSenderFactory<T> {
|
||||
|
||||
/**
|
||||
* Create a reactive message sender that will send messages to the default topic.
|
||||
* @param schema the schema of the messages to be sent
|
||||
* @return the reactive message sender
|
||||
*/
|
||||
ReactiveMessageSender<T> createSender(Schema<T> schema);
|
||||
|
||||
/**
|
||||
* Create a reactive message sender.
|
||||
* @param topic the topic to send messages to or {@code null} to use the default topic
|
||||
|
||||
@@ -18,8 +18,10 @@ package org.springframework.pulsar.reactive.core;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
|
||||
import static org.assertj.core.api.Assertions.assertThatNullPointerException;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
import org.apache.pulsar.client.api.CompressionType;
|
||||
import org.apache.pulsar.client.api.PulsarClient;
|
||||
@@ -76,26 +78,15 @@ class DefaultReactiveMessageSenderFactoryTests {
|
||||
return new DefaultReactivePulsarSenderFactory<>((PulsarClient) null, null, cache);
|
||||
}
|
||||
|
||||
@Nested
|
||||
class CreateSenderSchemaOnlyApi {
|
||||
|
||||
@Test
|
||||
void withDefaultTopic() {
|
||||
var sender = newSenderFactoryWithDefaultTopic("topic0").createSender(schema);
|
||||
assertThatSenderHasTopic(sender, "topic0");
|
||||
}
|
||||
|
||||
@Test
|
||||
void withoutDefaultTopic() {
|
||||
assertThatIllegalArgumentException().isThrownBy(() -> newSenderFactory().createSender(schema))
|
||||
.withMessageContaining("Topic must be specified when no default topic is configured");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Nested
|
||||
class CreateSenderSchemaAndTopicApi {
|
||||
|
||||
@Test
|
||||
void withoutSchema() {
|
||||
assertThatNullPointerException().isThrownBy(() -> newSenderFactory().createSender(null, "topic0"))
|
||||
.withMessageContaining("Schema must be specified");
|
||||
}
|
||||
|
||||
@Test
|
||||
void topicSpecifiedWithDefaultTopic() {
|
||||
var sender = newSenderFactoryWithDefaultTopic("topic0").createSender(schema, "topic1");
|
||||
@@ -132,6 +123,14 @@ class DefaultReactiveMessageSenderFactoryTests {
|
||||
(senderSpec) -> assertThat(senderSpec.getProducerName()).isEqualTo("fooProducer"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void singleCustomizerViaListApi() {
|
||||
var sender = newSenderFactory().createSender(schema, "topic1",
|
||||
Collections.singletonList((b) -> b.producerName("fooProducer")));
|
||||
assertThatSenderSpecSatisfies(sender,
|
||||
(senderSpec) -> assertThat(senderSpec.getProducerName()).isEqualTo("fooProducer"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void multipleCustomizers() {
|
||||
var sender = newSenderFactory().createSender(schema, "topic1",
|
||||
|
||||
Reference in New Issue
Block a user