Make SendMessageBuilder interface more navigable (#304)

This commit is contained in:
Christophe Bornet
2023-02-02 16:49:02 +01:00
committed by GitHub
parent 75e235f43a
commit 883aeaaf3c
3 changed files with 98 additions and 74 deletions

View File

@@ -111,49 +111,43 @@ public interface ReactivePulsarOperations<T> {
Flux<MessageId> send(@Nullable String topic, Publisher<T> messages, @Nullable Schema<T> schema);
/**
* Create a {@link SendMessageBuilder builder} for configuring and sending a message
* reactively.
* Create a {@link SendOneMessageBuilder builder} for configuring and sending a
* message reactively.
* @param message the payload of the message
* @return the builder to configure and send the message
*/
SendMessageBuilder<T> newMessage(T message);
SendOneMessageBuilder<T> newMessage(T message);
/**
* Create a {@link SendMessageBuilder builder} for configuring and sending multiple
* messages reactively.
* Create a {@link SendManyMessageBuilder builder} for configuring and sending
* multiple messages reactively.
* @param messages the messages to send
* @return the builder to configure and send the message
*/
SendMessageBuilder<T> newMessages(Publisher<T> messages);
SendManyMessageBuilder<T> newMessages(Publisher<T> messages);
/**
* Builder that can be used to configure and send a message. Provides more options
* than the send methods provided by {@link ReactivePulsarOperations}.
*
* @param <O> the builder type
* @param <T> the message payload type
*/
interface SendMessageBuilder<T> {
sealed interface SendMessageBuilder<O, T> permits SendOneMessageBuilder, SendManyMessageBuilder {
/**
* Specify the topic to send the message to.
* @param topic the destination topic
* @return the current builder with the destination topic specified
*/
SendMessageBuilder<T> withTopic(String topic);
O withTopic(String topic);
/**
* Specify the schema to use when sending the message.
* @param schema the schema to use
* @return the current builder with the schema specified
*/
SendMessageBuilder<T> withSchema(Schema<T> schema);
/**
* Specifies the message customizer to use to further configure the message.
* @param customizer the message customizer
* @return the current builder with the message customizer specified
*/
SendMessageBuilder<T> withMessageCustomizer(MessageSpecBuilderCustomizer<T> customizer);
O withSchema(Schema<T> schema);
/**
* Specifies the customizer to use to further configure the reactive sender
@@ -162,7 +156,18 @@ public interface ReactivePulsarOperations<T> {
* @return the current builder with the reactive sender builder customizer
* specified
*/
SendMessageBuilder<T> withSenderCustomizer(ReactiveMessageSenderBuilderCustomizer<T> customizer);
O withSenderCustomizer(ReactiveMessageSenderBuilderCustomizer<T> customizer);
}
non-sealed interface SendOneMessageBuilder<T> extends SendMessageBuilder<SendOneMessageBuilder<T>, T> {
/**
* Specifies the message customizer to use to further configure the message.
* @param customizer the message customizer
* @return the current builder with the message customizer specified
*/
SendOneMessageBuilder<T> withMessageCustomizer(MessageSpecBuilderCustomizer<T> customizer);
/**
* Send the message in a reactive manner using the configured specification.
@@ -170,12 +175,16 @@ public interface ReactivePulsarOperations<T> {
*/
Mono<MessageId> send();
}
non-sealed interface SendManyMessageBuilder<T> extends SendMessageBuilder<SendManyMessageBuilder<T>, T> {
/**
* Send the messages in a reactive manner using the configured specification.
* @return the ids assigned by the broker to the published messages in the same
* order as they were sent
*/
Flux<MessageId> sendMany();
Flux<MessageId> send();
}

View File

@@ -101,27 +101,27 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
@Override
public Flux<MessageId> send(Publisher<T> messages, @Nullable Schema<T> schema) {
return doSendMany(null, Flux.from(messages), schema);
return doSendMany(null, Flux.from(messages), schema, null);
}
@Override
public Flux<MessageId> send(@Nullable String topic, Publisher<T> messages) {
return doSendMany(topic, Flux.from(messages), null);
return doSendMany(topic, Flux.from(messages), null, null);
}
@Override
public Flux<MessageId> send(@Nullable String topic, Publisher<T> messages, @Nullable Schema<T> schema) {
return doSendMany(topic, Flux.from(messages), schema);
return doSendMany(topic, Flux.from(messages), schema, null);
}
@Override
public SendMessageBuilderImpl<T> newMessage(T message) {
return new SendMessageBuilderImpl<>(this, message);
public SendOneMessageBuilder<T> newMessage(T message) {
return new SendOneMessageBuilderImpl<>(this, message);
}
@Override
public SendMessageBuilder<T> newMessages(Publisher<T> messages) {
return new SendMessageBuilderImpl<>(this, messages);
public SendManyMessageBuilder<T> newMessages(Publisher<T> messages) {
return new SendManyMessageBuilderImpl<>(this, messages);
}
private Mono<MessageId> doSend(@Nullable String topic, T message, @Nullable Schema<T> schema,
@@ -137,12 +137,13 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
// @formatter:on
}
private Flux<MessageId> doSendMany(@Nullable String topic, Flux<T> messages, @Nullable Schema<T> schema) {
private Flux<MessageId> doSendMany(@Nullable String topic, Flux<T> messages, @Nullable Schema<T> schema,
@Nullable ReactiveMessageSenderBuilderCustomizer<T> customizer) {
return messages.switchOnFirst((firstSignal, messageFlux) -> {
T firstMessage = firstSignal.get();
if (firstMessage != null && firstSignal.isOnNext()) {
String topicName = resolveTopic(topic, firstMessage.getClass());
ReactiveMessageSender<T> sender = createMessageSender(topicName, firstMessage, schema, null);
ReactiveMessageSender<T> sender = createMessageSender(topicName, firstMessage, schema, customizer);
return messageFlux.map(MessageSpec::of).as(sender::sendMany)
.doOnError(ex -> this.logger.error(ex,
() -> String.format("Failed to send messages to '%s' topic", topicName)))
@@ -185,68 +186,83 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
return this.reactiveMessageSenderFactory.createSender(resolvedSchema, topic, customizer);
}
public static class SendMessageBuilderImpl<T> implements SendMessageBuilder<T> {
private static class SendMessageBuilderImpl<O, T> {
private final ReactivePulsarTemplate<T> template;
protected final ReactivePulsarTemplate<T> template;
@Nullable
protected String topic;
@Nullable
protected Schema<T> schema;
@Nullable
protected ReactiveMessageSenderBuilderCustomizer<T> senderCustomizer;
SendMessageBuilderImpl(ReactivePulsarTemplate<T> template) {
this.template = template;
}
@SuppressWarnings("unchecked")
public O withTopic(String topic) {
this.topic = topic;
return (O) this;
}
@SuppressWarnings("unchecked")
public O withSchema(Schema<T> schema) {
this.schema = schema;
return (O) this;
}
@SuppressWarnings("unchecked")
public O withSenderCustomizer(ReactiveMessageSenderBuilderCustomizer<T> senderCustomizer) {
this.senderCustomizer = senderCustomizer;
return (O) this;
}
}
private static final class SendOneMessageBuilderImpl<T>
extends SendMessageBuilderImpl<SendOneMessageBuilderImpl<T>, T> implements SendOneMessageBuilder<T> {
private final T message;
private final Publisher<T> messages;
private String topic;
private Schema<T> schema;
@Nullable
private MessageSpecBuilderCustomizer<T> messageCustomizer;
private ReactiveMessageSenderBuilderCustomizer<T> senderCustomizer;
SendMessageBuilderImpl(ReactivePulsarTemplate<T> template, T message) {
this.template = template;
SendOneMessageBuilderImpl(ReactivePulsarTemplate<T> template, T message) {
super(template);
this.message = message;
this.messages = null;
}
SendMessageBuilderImpl(ReactivePulsarTemplate<T> template, Publisher<T> messages) {
this.template = template;
this.message = null;
this.messages = messages;
}
@Override
public SendMessageBuilderImpl<T> withTopic(String topic) {
this.topic = topic;
return this;
}
@Override
public SendMessageBuilderImpl<T> withSchema(Schema<T> schema) {
this.schema = schema;
return this;
}
@Override
public SendMessageBuilderImpl<T> withMessageCustomizer(MessageSpecBuilderCustomizer<T> messageCustomizer) {
public SendOneMessageBuilderImpl<T> withMessageCustomizer(MessageSpecBuilderCustomizer<T> messageCustomizer) {
this.messageCustomizer = messageCustomizer;
return this;
}
@Override
public SendMessageBuilderImpl<T> withSenderCustomizer(
ReactiveMessageSenderBuilderCustomizer<T> senderCustomizer) {
this.senderCustomizer = senderCustomizer;
return this;
}
@Override
public Mono<MessageId> send() {
return this.template.doSend(this.topic, this.message, this.schema, this.messageCustomizer,
this.senderCustomizer);
}
}
private static final class SendManyMessageBuilderImpl<T>
extends SendMessageBuilderImpl<SendManyMessageBuilderImpl<T>, T> implements SendManyMessageBuilder<T> {
private final Publisher<T> messages;
SendManyMessageBuilderImpl(ReactivePulsarTemplate<T> template, Publisher<T> messages) {
super(template);
this.messages = messages;
}
@Override
public Flux<MessageId> sendMany() {
return this.template.doSendMany(this.topic, Flux.from(this.messages), this.schema);
public Flux<MessageId> send() {
return this.template.doSendMany(this.topic, Flux.from(this.messages), this.schema, this.senderCustomizer);
}
}

View File

@@ -81,7 +81,7 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport {
pulsarTemplate.send(Flux.fromIterable(foos), Schema.JSON(Foo.class)).subscribe();
}
else {
pulsarTemplate.newMessages(Flux.fromIterable(foos)).withSchema(Schema.JSON(Foo.class)).sendMany()
pulsarTemplate.newMessages(Flux.fromIterable(foos)).withSchema(Schema.JSON(Foo.class)).send()
.subscribe();
}
@@ -121,7 +121,7 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport {
pulsarTemplate.send(Flux.fromIterable(foos)).subscribe();
}
else {
pulsarTemplate.newMessages(Flux.fromIterable(foos)).sendMany().subscribe();
pulsarTemplate.newMessages(Flux.fromIterable(foos)).send().subscribe();
}
// TODO figure out if expected to not be ordered when schema not set on
@@ -180,11 +180,10 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport {
.send(Flux.fromIterable(data), Schema.STRING).subscribe()),
arguments("fluentApiNoSchema",
(BiConsumer<List<String>, ReactivePulsarTemplate<String>>) (data, template) -> template
.newMessages(Flux.fromIterable(data)).sendMany().subscribe()),
.newMessages(Flux.fromIterable(data)).send().subscribe()),
arguments("fluentApiWithSchema",
(BiConsumer<List<String>, ReactivePulsarTemplate<String>>) (data, template) -> template
.newMessages(Flux.fromIterable(data)).withSchema(Schema.STRING).sendMany()
.subscribe()));
.newMessages(Flux.fromIterable(data)).withSchema(Schema.STRING).send().subscribe()));
}
@ParameterizedTest(name = "{0}")
@@ -230,7 +229,7 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport {
}
}
else {
ReactivePulsarTemplate.SendMessageBuilderImpl<String> messageBuilder = pulsarTemplate
ReactivePulsarTemplate.SendOneMessageBuilder<String> messageBuilder = pulsarTemplate
.newMessage(msgPayload);
if (testArgs.explicitTopic) {
messageBuilder = messageBuilder.withTopic(topic);