Allow topic/schema resolution in ReactivePulsarTemplate sendMany (#303)

- Use switchOnFirst to determine the Schema and topic from the first message in doSendMany 
- Fixes ordering issue when `doSendMany` did not have schema (see #300)
This commit is contained in:
Christophe Bornet
2023-02-02 04:20:44 +01:00
committed by GitHub
parent c288e9e496
commit 75e235f43a
5 changed files with 41 additions and 78 deletions

View File

@@ -49,18 +49,19 @@ public class DefaultReactivePulsarSenderFactory<T> implements ReactivePulsarSend
private final ReactiveMessageSenderSpec reactiveMessageSenderSpec;
@Nullable
private final ReactiveMessageSenderCache reactiveMessageSenderCache;
public DefaultReactivePulsarSenderFactory(PulsarClient pulsarClient,
ReactiveMessageSenderSpec reactiveMessageSenderSpec,
ReactiveMessageSenderCache reactiveMessageSenderCache) {
@Nullable ReactiveMessageSenderSpec reactiveMessageSenderSpec,
@Nullable ReactiveMessageSenderCache reactiveMessageSenderCache) {
this(AdaptedReactivePulsarClientFactory.create(pulsarClient), reactiveMessageSenderSpec,
reactiveMessageSenderCache);
}
public DefaultReactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient,
ReactiveMessageSenderSpec reactiveMessageSenderSpec,
ReactiveMessageSenderCache reactiveMessageSenderCache) {
@Nullable ReactiveMessageSenderSpec reactiveMessageSenderSpec,
@Nullable ReactiveMessageSenderCache reactiveMessageSenderCache) {
this.reactivePulsarClient = reactivePulsarClient;
this.reactiveMessageSenderSpec = new ImmutableReactiveMessageSenderSpec(
reactiveMessageSenderSpec != null ? reactiveMessageSenderSpec : new MutableReactiveMessageSenderSpec());

View File

@@ -126,16 +126,6 @@ public interface ReactivePulsarOperations<T> {
*/
SendMessageBuilder<T> newMessages(Publisher<T> messages);
/**
* Create a {@link SendMessageBuilder builder} for configuring and sending multiple
* messages reactively.
* @param messages the messages to send
* @param messageType the type of messages being sent - helpful for topic resolution
* when schema is not specified during a {@code sendMany} operation
* @return the builder to configure and send the message
*/
SendMessageBuilder<T> newMessages(Publisher<T> messages, Class<T> messageType);
/**
* Builder that can be used to configure and send a message. Provides more options
* than the send methods provided by {@link ReactivePulsarOperations}.

View File

@@ -101,17 +101,17 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
@Override
public Flux<MessageId> send(Publisher<T> messages, @Nullable Schema<T> schema) {
return doSendMany(null, Flux.from(messages), null, schema);
return doSendMany(null, Flux.from(messages), schema);
}
@Override
public Flux<MessageId> send(@Nullable String topic, Publisher<T> messages) {
return doSendMany(topic, Flux.from(messages), null, null);
return doSendMany(topic, Flux.from(messages), null);
}
@Override
public Flux<MessageId> send(@Nullable String topic, Publisher<T> messages, @Nullable Schema<T> schema) {
return doSendMany(topic, Flux.from(messages), null, schema);
return doSendMany(topic, Flux.from(messages), schema);
}
@Override
@@ -121,12 +121,7 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
@Override
public SendMessageBuilder<T> newMessages(Publisher<T> messages) {
return new SendMessageBuilderImpl<>(this, messages, null);
}
@Override
public SendMessageBuilder<T> newMessages(Publisher<T> messages, Class<T> messageType) {
return new SendMessageBuilderImpl<>(this, messages, messageType);
return new SendMessageBuilderImpl<>(this, messages);
}
private Mono<MessageId> doSend(@Nullable String topic, T message, @Nullable Schema<T> schema,
@@ -142,25 +137,21 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
// @formatter:on
}
private Flux<MessageId> doSendMany(@Nullable String topic, Flux<T> messages, @Nullable Class<T> messageType,
@Nullable Schema<T> schema) {
if (schema != null) {
/*
* If schema specified we can create the message sender right away and use
* ReactiveMessageSender::sendMany to send them as a stream. Otherwise, we
* need to wait to get a message to create the sender, and we can't share it
* between messages. So we create one each time and use
* ReactiveMessageSender::sendOne to send messages individually.
*/
String topicName = resolveTopic(topic, messageType);
ReactiveMessageSender<T> sender = createMessageSender(topicName, null, schema, null);
return messages.map(MessageSpec::of).as(sender::sendMany)
.doOnError(ex -> this.logger.error(ex,
() -> String.format("Failed to send messages to '%s' topic", topicName)))
.doOnNext(
msgId -> this.logger.trace(() -> String.format("Sent messages to '%s' topic", topicName)));
}
return messages.flatMapSequential(message -> doSend(topic, message, null, null, null));
private Flux<MessageId> doSendMany(@Nullable String topic, Flux<T> messages, @Nullable Schema<T> schema) {
return messages.switchOnFirst((firstSignal, messageFlux) -> {
T firstMessage = firstSignal.get();
if (firstMessage != null && firstSignal.isOnNext()) {
String topicName = resolveTopic(topic, firstMessage.getClass());
ReactiveMessageSender<T> sender = createMessageSender(topicName, firstMessage, schema, null);
return messageFlux.map(MessageSpec::of).as(sender::sendMany)
.doOnError(ex -> this.logger.error(ex,
() -> String.format("Failed to send messages to '%s' topic", topicName)))
.doOnNext(msgId -> this.logger
.trace(() -> String.format("Sent messages to '%s' topic", topicName)));
}
// The flux has errored or is completed
return messageFlux.thenMany(Flux.empty());
});
}
private String resolveTopic(@Nullable String topic, @Nullable Class<?> messageType) {
@@ -187,10 +178,11 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
private ReactiveMessageSender<T> createMessageSender(@Nullable String topic, T message, @Nullable Schema<T> schema,
@Nullable ReactiveMessageSenderBuilderCustomizer<T> customizer) {
if (schema == null) {
schema = this.schemaResolver.getSchema(message);
Schema<T> resolvedSchema = schema == null ? this.schemaResolver.getSchema(message) : schema;
if (resolvedSchema == null) {
throw new IllegalArgumentException("Couldn't resolve a schema for the message");
}
return this.reactiveMessageSenderFactory.createSender(schema, topic, customizer);
return this.reactiveMessageSenderFactory.createSender(resolvedSchema, topic, customizer);
}
public static class SendMessageBuilderImpl<T> implements SendMessageBuilder<T> {
@@ -201,9 +193,6 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
private final Publisher<T> messages;
@Nullable
private Class<T> messageType;
private String topic;
private Schema<T> schema;
@@ -216,15 +205,12 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
this.template = template;
this.message = message;
this.messages = null;
this.messageType = null;
}
SendMessageBuilderImpl(ReactivePulsarTemplate<T> template, Publisher<T> messages,
@Nullable Class<T> messageType) {
SendMessageBuilderImpl(ReactivePulsarTemplate<T> template, Publisher<T> messages) {
this.template = template;
this.message = null;
this.messages = messages;
this.messageType = messageType;
}
@Override
@@ -260,7 +246,7 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
@Override
public Flux<MessageId> sendMany() {
return this.template.doSendMany(this.topic, Flux.from(this.messages), this.messageType, this.schema);
return this.template.doSendMany(this.topic, Flux.from(this.messages), this.schema);
}
}

View File

@@ -86,8 +86,8 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport {
}
for (int i = 0; i < 10; i++) {
assertThat(consumer.receiveAsync()).succeedsWithin(Duration.ofSeconds(3))
.extracting(Message::getValue).isEqualTo(foos.get(i));
assertThat(consumer.receiveAsync().thenApply(Message::getValue))
.succeedsWithin(Duration.ofSeconds(3)).isEqualTo(foos.get(i));
}
}
}
@@ -139,7 +139,7 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport {
@ParameterizedTest(name = "{0}")
@MethodSource("sendManyWithInferredTopicProvider")
void sendManyWithInferredTopic(String testName, boolean shouldPass,
void sendManyWithInferredTopic(String testName,
BiConsumer<List<String>, ReactivePulsarTemplate<String>> sendHandler) throws Exception {
String topic = "rptt-" + testName + "-topic";
String sub = "rptt-" + testName + "-sub";
@@ -163,40 +163,25 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport {
sendHandler.accept(Collections.singletonList(theSingleFoo), pulsarTemplate);
if (shouldPass) {
assertThat(consumer.receiveAsync()).succeedsWithin(Duration.ofSeconds(3))
.extracting(Message::getValue).isEqualTo(theSingleFoo);
}
else {
assertThat(consumer.receive(3, TimeUnit.SECONDS)).isNull();
}
assertThat(consumer.receiveAsync()).succeedsWithin(Duration.ofSeconds(3)).extracting(Message::getValue)
.isEqualTo(theSingleFoo);
}
}
}
static Stream<Arguments> sendManyWithInferredTopicProvider() {
// NOTE: Topic resolution requires in ReactivePulsarTemplate.sendMany requires
// the messageType to be set when the schema is specified as it creates the
// message sender spec before it subscribes to the publisher of messages (iow
// it does not have access to a message to determine message type in this case
return Stream.of(
arguments("simpleApiNoSchemaPasses", true,
arguments("simpleApiNoSchema",
(BiConsumer<List<String>, ReactivePulsarTemplate<String>>) (data, template) -> template
.send(Flux.fromIterable(data)).subscribe()),
arguments("simpleApiWithSchemaAlwaysFails", false,
arguments("simpleApiWithSchema",
(BiConsumer<List<String>, ReactivePulsarTemplate<String>>) (data, template) -> template
.send(Flux.fromIterable(data), Schema.STRING).subscribe()),
arguments("fluentApiNoSchemaWithMsgTypePasses", true,
(BiConsumer<List<String>, ReactivePulsarTemplate<String>>) (data, template) -> template
.newMessages(Flux.fromIterable(data), String.class).sendMany().subscribe()),
arguments("fluentApiNoSchemaNoMsgTypePasses", true,
arguments("fluentApiNoSchema",
(BiConsumer<List<String>, ReactivePulsarTemplate<String>>) (data, template) -> template
.newMessages(Flux.fromIterable(data)).sendMany().subscribe()),
arguments("fluentApiWithSchemaWithMsgTypePasses", true,
(BiConsumer<List<String>, ReactivePulsarTemplate<String>>) (data, template) -> template
.newMessages(Flux.fromIterable(data), String.class).withSchema(Schema.STRING).sendMany()
.subscribe()),
arguments("fluentApiWithSchemaNoMsgTypeFails", false,
arguments("fluentApiWithSchema",
(BiConsumer<List<String>, ReactivePulsarTemplate<String>>) (data, template) -> template
.newMessages(Flux.fromIterable(data)).withSchema(Schema.STRING).sendMany()
.subscribe()));

View File

@@ -302,6 +302,7 @@ class DefaultReactivePulsarMessageListenerContainerTests implements PulsarTestCo
container.setConsumerCustomizer(b -> b.deadLetterPolicy(deadLetterPolicy));
container.start();
MutableReactiveMessageSenderSpec prodConfig = new MutableReactiveMessageSenderSpec();
prodConfig.setBatchingEnabled(false);
prodConfig.setTopicName(topic);
DefaultReactivePulsarSenderFactory<String> pulsarProducerFactory = new DefaultReactivePulsarSenderFactory<>(
reactivePulsarClient, prodConfig, null);