Remove setSchema API from templates

For both PulsarTemplate and ReactivePulsarTemplate:
* remove setSchema()
* add schema param in simple API
* add schema param in fluent API

Update all tests and samples accordingly

See #268
This commit is contained in:
Chris Bono
2023-01-25 11:39:49 -06:00
committed by Soby Chacko
parent 9478c491fd
commit ed2a67b0c4
15 changed files with 593 additions and 409 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ import java.util.Optional;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
import org.springframework.lang.Nullable;
import org.springframework.util.StringUtils;
/**
@@ -32,7 +33,7 @@ final class ReactiveMessageSenderUtils {
private ReactiveMessageSenderUtils() {
}
static <T> String resolveTopicName(String userSpecifiedTopic,
static <T> String resolveTopicName(@Nullable String userSpecifiedTopic,
ReactivePulsarSenderFactory<T> reactiveMessageSenderFactory) {
ReactiveMessageSenderSpec reactiveMessageSenderSpec = reactiveMessageSenderFactory
.getReactiveMessageSenderSpec();

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,8 +17,11 @@
package org.springframework.pulsar.reactive.core;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.reactivestreams.Publisher;
import org.springframework.lang.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -27,6 +30,7 @@ import reactor.core.publisher.Mono;
*
* @param <T> the message payload type
* @author Christophe Bornet
* @author Chris Bono
*/
public interface ReactivePulsarOperations<T> {
@@ -37,6 +41,15 @@ public interface ReactivePulsarOperations<T> {
*/
Mono<MessageId> send(T message);
/**
* Sends a message to the specified topic in a reactive manner. default topic
* @param message the message to send
* @param schema the schema to use or {@code null} to use the default schema
* resolution
* @return the id assigned by the broker to the published message
*/
Mono<MessageId> send(T message, @Nullable Schema<T> schema);
/**
* Sends a message to the specified topic in a reactive manner.
* @param topic the topic to send the message to or {@code null} to send to the
@@ -44,7 +57,18 @@ public interface ReactivePulsarOperations<T> {
* @param message the message to send
* @return the id assigned by the broker to the published message
*/
Mono<MessageId> send(String topic, T message);
Mono<MessageId> send(@Nullable String topic, T message);
/**
* Sends a message to the specified topic in a reactive manner.
* @param topic the topic to send the message to or {@code null} to send to the
* default topic
* @param message the message to send
* @param schema the schema to use or {@code null} to use the default schema
* resolution
* @return the id assigned by the broker to the published message
*/
Mono<MessageId> send(@Nullable String topic, T message, @Nullable Schema<T> schema);
/**
* Sends multiple messages to the default topic in a reactive manner.
@@ -54,6 +78,16 @@ public interface ReactivePulsarOperations<T> {
*/
Flux<MessageId> send(Publisher<T> messages);
/**
* Sends multiple messages to the default topic in a reactive manner.
* @param messages the messages to send
* @param schema the schema to use or {@code null} to use the default schema
* resolution
* @return the ids assigned by the broker to the published messages in the same order
* as they were sent
*/
Flux<MessageId> send(Publisher<T> messages, @Nullable Schema<T> schema);
/**
* Sends multiple messages to the specified topic in a reactive manner.
* @param topic the topic to send the message to or {@code null} to send to the
@@ -62,7 +96,19 @@ public interface ReactivePulsarOperations<T> {
* @return the ids assigned by the broker to the published messages in the same order
* as they were sent
*/
Flux<MessageId> send(String topic, Publisher<T> messages);
Flux<MessageId> send(@Nullable String topic, Publisher<T> messages);
/**
* Sends multiple messages to the specified topic in a reactive manner.
* @param topic the topic to send the message to or {@code null} to send to the
* default topic
* @param messages the messages to send
* @param schema the schema to use or {@code null} to use the default schema
* resolution
* @return the ids assigned by the broker to the published messages in the same order
* as they were sent
*/
Flux<MessageId> send(@Nullable String topic, Publisher<T> messages, @Nullable Schema<T> schema);
/**
* Create a {@link SendMessageBuilder builder} for configuring and sending a message
@@ -72,6 +118,14 @@ public interface ReactivePulsarOperations<T> {
*/
SendMessageBuilder<T> newMessage(T message);
/**
* Create a {@link SendMessageBuilder builder} for configuring and sending multiple
* messages reactively.
* @param messages the messages to send
* @return the builder to configure and send the message
*/
SendMessageBuilder<T> newMessages(Publisher<T> messages);
/**
* Builder that can be used to configure and send a message. Provides more options
* than the send methods provided by {@link ReactivePulsarOperations}.
@@ -87,6 +141,13 @@ public interface ReactivePulsarOperations<T> {
*/
SendMessageBuilder<T> withTopic(String topic);
/**
* Specify the schema to use when sending the message.
* @param schema the schema to use
* @return the current builder with the schema specified
*/
SendMessageBuilder<T> withSchema(Schema<T> schema);
/**
* Specifies the message customizer to use to further configure the message.
* @param customizer the message customizer
@@ -109,6 +170,13 @@ public interface ReactivePulsarOperations<T> {
*/
Mono<MessageId> send();
/**
* Send the messages in a reactive manner using the configured specification.
* @return the ids assigned by the broker to the published messages in the same
* order as they were sent
*/
Flux<MessageId> sendMany();
}
}

View File

@@ -34,7 +34,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* A thread-safe template for executing high-level reactive Pulsar operations.
* A template for executing high-level reactive Pulsar operations.
*
* @param <T> the message payload type
* @author Christophe Bornet
@@ -47,9 +47,6 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
private final SchemaResolver schemaResolver;
@Nullable
private Schema<T> schema;
/**
* Construct a template instance that uses the default schema resolver.
* @param reactiveMessageSenderFactory the factory used to create the backing Pulsar
@@ -76,8 +73,18 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
}
@Override
public Mono<MessageId> send(String topic, T message) {
return doSend(topic, message, null, null);
public Mono<MessageId> send(T message, @Nullable Schema<T> schema) {
return doSend(null, message, schema, null, null);
}
@Override
public Mono<MessageId> send(@Nullable String topic, T message) {
return doSend(topic, message, null, null, null);
}
@Override
public Mono<MessageId> send(@Nullable String topic, T message, @Nullable Schema<T> schema) {
return doSend(topic, message, schema, null, null);
}
@Override
@@ -86,8 +93,18 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
}
@Override
public Flux<MessageId> send(String topic, Publisher<T> messages) {
return doSendMany(topic, Flux.from(messages));
public Flux<MessageId> send(Publisher<T> messages, @Nullable Schema<T> schema) {
return doSendMany(null, Flux.from(messages), schema);
}
@Override
public Flux<MessageId> send(@Nullable String topic, Publisher<T> messages) {
return doSendMany(topic, Flux.from(messages), null);
}
@Override
public Flux<MessageId> send(@Nullable String topic, Publisher<T> messages, @Nullable Schema<T> schema) {
return doSendMany(topic, Flux.from(messages), schema);
}
@Override
@@ -95,62 +112,64 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
return new SendMessageBuilderImpl<>(this, message);
}
/**
* Set the schema to use on this template.
* @param schema provides the {@link Schema} used on this template
*/
public void setSchema(Schema<T> schema) {
this.schema = schema;
@Override
public SendMessageBuilder<T> newMessages(Publisher<T> messages) {
return new SendMessageBuilderImpl<>(this, messages);
}
private Mono<MessageId> doSend(String topic, T message,
MessageSpecBuilderCustomizer<T> messageSpecBuilderCustomizer,
ReactiveMessageSenderBuilderCustomizer<T> customizer) {
private Mono<MessageId> doSend(@Nullable String topic, T message, @Nullable Schema<T> schema,
@Nullable MessageSpecBuilderCustomizer<T> messageSpecBuilderCustomizer,
@Nullable ReactiveMessageSenderBuilderCustomizer<T> customizer) {
String topicName = ReactiveMessageSenderUtils.resolveTopicName(topic, this.reactiveMessageSenderFactory);
this.logger.trace(() -> String.format("Sending reactive message to '%s' topic", topicName));
ReactiveMessageSender<T> sender = createMessageSender(topic, message, customizer);
return sender.sendOne(getMessageSpec(messageSpecBuilderCustomizer, message)).doOnError(
ex -> this.logger.error(ex, () -> String.format("Failed to send message to '%s' topic", topicName)))
// NOTE: We do not pass the resolved topic name from above as it handles the
// resolve itself
ReactiveMessageSender<T> sender = createMessageSender(topic, message, schema, customizer);
// @formatter:off
return sender.sendOne(getMessageSpec(messageSpecBuilderCustomizer, message))
.doOnError(ex -> this.logger.error(ex, () -> String.format("Failed to send message to '%s' topic", topicName)))
.doOnSuccess(msgId -> this.logger.trace(() -> String.format("Sent message to '%s' topic", topicName)));
// @formatter:on
}
private Flux<MessageId> doSendMany(String topic, Flux<T> messages) {
private Flux<MessageId> doSendMany(@Nullable String topic, Flux<T> messages, @Nullable Schema<T> schema) {
String topicName = ReactiveMessageSenderUtils.resolveTopicName(topic, this.reactiveMessageSenderFactory);
this.logger.trace(() -> String.format("Sending reactive messages to '%s' topic", topicName));
if (this.schema != null) {
if (schema != null) {
/*
* If the template has a schema, we can create the message sender right away
* and use ReactiveMessageSender::sendMany to send them as a stream.
* Otherwise, we need to wait to get a message to create it and we can't share
* it between messages. So we create one each time and use
* If schema specified we can create the message sender right away and use
* ReactiveMessageSender::sendMany to send them as a stream. Otherwise, we
* need to wait to get a message to create the sender, and we can't share it
* between messages. So we create one each time and use
* ReactiveMessageSender::sendOne to send messages individually.
*/
ReactiveMessageSender<T> sender = createMessageSender(topic, null, null);
// NOTE: We do not pass the resolved topic name from above as it handles the
// resolve itself
ReactiveMessageSender<T> sender = createMessageSender(topic, null, schema, null);
return messages.map(MessageSpec::of).as(sender::sendMany)
.doOnError(ex -> this.logger.error(ex,
() -> String.format("Failed to send messages to '%s' topic", topicName)))
.doOnNext(
msgId -> this.logger.trace(() -> String.format("Sent messages to '%s' topic", topicName)));
}
return messages.flatMapSequential(message -> doSend(topic, message, null, null));
return messages.flatMapSequential(message -> doSend(topic, message, schema, null, null));
}
private static <T> MessageSpec<T> getMessageSpec(MessageSpecBuilderCustomizer<T> messageSpecBuilderCustomizer,
T message) {
private static <T> MessageSpec<T> getMessageSpec(
@Nullable MessageSpecBuilderCustomizer<T> messageSpecBuilderCustomizer, T message) {
MessageSpecBuilder<T> messageSpecBuilder = MessageSpec.builder(message);
if (messageSpecBuilderCustomizer != null) {
messageSpecBuilderCustomizer.customize(messageSpecBuilder);
}
return messageSpecBuilder.build();
}
private ReactiveMessageSender<T> createMessageSender(String topic, T message,
ReactiveMessageSenderBuilderCustomizer<T> customizer) {
Schema<T> schema = this.schema != null ? this.schema : this.schemaResolver.getSchema(message);
private ReactiveMessageSender<T> createMessageSender(@Nullable String topic, T message, @Nullable Schema<T> schema,
@Nullable ReactiveMessageSenderBuilderCustomizer<T> customizer) {
if (schema == null) {
schema = this.schemaResolver.getSchema(message);
}
return this.reactiveMessageSenderFactory.createSender(topic, schema,
customizer == null ? Collections.emptyList() : Collections.singletonList(customizer));
}
@@ -161,8 +180,12 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
private final T message;
private final Publisher<T> messages;
private String topic;
private Schema<T> schema;
private MessageSpecBuilderCustomizer<T> messageCustomizer;
private ReactiveMessageSenderBuilderCustomizer<T> senderCustomizer;
@@ -170,6 +193,13 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
SendMessageBuilderImpl(ReactivePulsarTemplate<T> template, T message) {
this.template = template;
this.message = message;
this.messages = null;
}
SendMessageBuilderImpl(ReactivePulsarTemplate<T> template, Publisher<T> messages) {
this.template = template;
this.message = null;
this.messages = messages;
}
@Override
@@ -178,6 +208,12 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
return this;
}
@Override
public SendMessageBuilderImpl<T> withSchema(Schema<T> schema) {
this.schema = schema;
return this;
}
@Override
public SendMessageBuilderImpl<T> withMessageCustomizer(MessageSpecBuilderCustomizer<T> messageCustomizer) {
this.messageCustomizer = messageCustomizer;
@@ -193,7 +229,13 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
@Override
public Mono<MessageId> send() {
return this.template.doSend(this.topic, this.message, this.messageCustomizer, this.senderCustomizer);
return this.template.doSend(this.topic, this.message, this.schema, this.messageCustomizer,
this.senderCustomizer);
}
@Override
public Flux<MessageId> sendMany() {
return this.template.doSendMany(this.topic, Flux.from(this.messages), this.schema);
}
}

View File

@@ -35,10 +35,10 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.MutableReactiveMessageSenderSpec;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.PulsarTestContainerSupport;
@@ -50,29 +50,37 @@ import reactor.core.publisher.Mono;
* Tests for {@link org.springframework.pulsar.reactive.core.ReactivePulsarTemplate}.
*
* @author Christophe Bornet
* @author Chris Bono
*/
class ReactivePulsarTemplateTests implements PulsarTestContainerSupport {
@Test
void sendMessagesWithSpecificSchema() throws Exception {
String topic = "smt-specific-schema-reactive-topic";
@ParameterizedTest
@ValueSource(booleans = { true, false })
void sendMessagesWithSpecificSchema(boolean useSimpleApi) throws Exception {
String topic = "rptt-sendMessagesWithSpecificSchema-" + useSimpleApi + "-topic";
String sub = "rptt-sendMessagesWithSpecificSchema-" + useSimpleApi + "-sub";
try (PulsarClient client = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl())
.build()) {
try (Consumer<Foo> consumer = client.newConsumer(Schema.JSON(Foo.class)).topic(topic)
.subscriptionName("smt-specific-schema-reactive-sub").subscribe()) {
try (Consumer<Foo> consumer = client.newConsumer(Schema.JSON(Foo.class)).topic(topic).subscriptionName(sub)
.subscribe()) {
MutableReactiveMessageSenderSpec senderSpec = new MutableReactiveMessageSenderSpec();
senderSpec.setTopicName(topic);
org.springframework.pulsar.reactive.core.ReactivePulsarSenderFactory<Foo> producerFactory = new org.springframework.pulsar.reactive.core.DefaultReactivePulsarSenderFactory<>(
client, senderSpec, null);
org.springframework.pulsar.reactive.core.ReactivePulsarTemplate<Foo> pulsarTemplate = new org.springframework.pulsar.reactive.core.ReactivePulsarTemplate<>(
producerFactory);
pulsarTemplate.setSchema(Schema.JSON(Foo.class));
ReactivePulsarSenderFactory<Foo> producerFactory = new DefaultReactivePulsarSenderFactory<>(client,
senderSpec, null);
ReactivePulsarTemplate<Foo> pulsarTemplate = new ReactivePulsarTemplate<>(producerFactory);
List<Foo> foos = new ArrayList<>();
for (int i = 0; i < 10; i++) {
foos.add(new Foo("Foo-" + UUID.randomUUID(), "Bar-" + UUID.randomUUID()));
}
pulsarTemplate.send(Flux.fromIterable(foos)).subscribe();
if (useSimpleApi) {
pulsarTemplate.send(Flux.fromIterable(foos), Schema.JSON(Foo.class)).subscribe();
}
else {
pulsarTemplate.newMessages(Flux.fromIterable(foos)).withSchema(Schema.JSON(Foo.class)).sendMany()
.subscribe();
}
for (int i = 0; i < 10; i++) {
assertThat(consumer.receiveAsync()).succeedsWithin(Duration.ofSeconds(3))
@@ -82,31 +90,39 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport {
}
}
@Test
void sendMessagesWithSpecificSchemaAndCustomTypeMappings() throws Exception {
String topic = "smt-specific-schema-custom-reactive-topic";
@ParameterizedTest
@ValueSource(booleans = { true, false })
void sendMessagesWithInferredSchema(boolean useSimpleApi) throws Exception {
String topic = "rptt-sendMessagesWithInferredSchema-" + useSimpleApi + "-topic";
String sub = "rptt-sendMessagesWithInferredSchema-" + useSimpleApi + "-sub";
try (PulsarClient client = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl())
.build()) {
try (Consumer<Foo> consumer = client.newConsumer(Schema.JSON(Foo.class)).topic(topic)
.subscriptionName("smt-specific-schema-custom-reactive-sub").subscribe()) {
try (Consumer<Foo> consumer = client.newConsumer(Schema.JSON(Foo.class)).topic(topic).subscriptionName(sub)
.subscribe()) {
MutableReactiveMessageSenderSpec senderSpec = new MutableReactiveMessageSenderSpec();
senderSpec.setTopicName(topic);
org.springframework.pulsar.reactive.core.ReactivePulsarSenderFactory<Foo> producerFactory = new org.springframework.pulsar.reactive.core.DefaultReactivePulsarSenderFactory<>(
client, senderSpec, null);
// Custom schema resolver allows not calling setSchema on template
ReactivePulsarSenderFactory<Foo> producerFactory = new DefaultReactivePulsarSenderFactory<>(client,
senderSpec, null);
// Custom schema resolver allows not specifying the schema when sending
DefaultSchemaResolver schemaResolver = new DefaultSchemaResolver();
schemaResolver.addCustomSchemaMapping(Foo.class, Schema.JSON(Foo.class));
org.springframework.pulsar.reactive.core.ReactivePulsarTemplate<Foo> pulsarTemplate = new org.springframework.pulsar.reactive.core.ReactivePulsarTemplate<>(
producerFactory, schemaResolver);
ReactivePulsarTemplate<Foo> pulsarTemplate = new ReactivePulsarTemplate<>(producerFactory,
schemaResolver);
List<Foo> foos = new ArrayList<>();
for (int i = 0; i < 10; i++) {
foos.add(new Foo("Foo-" + UUID.randomUUID(), "Bar-" + UUID.randomUUID()));
}
pulsarTemplate.send(Flux.fromIterable(foos)).subscribe();
// TODO figure out why ordering is not preserved when template does not
// have schema set
if (useSimpleApi) {
pulsarTemplate.send(Flux.fromIterable(foos)).subscribe();
}
else {
pulsarTemplate.newMessages(Flux.fromIterable(foos)).sendMany().subscribe();
}
// TODO figure out if expected to not be ordered when schema not set on
// template
List<Foo> foos2 = new ArrayList<>();
for (int i = 0; i < 10; i++) {
CompletableFuture<Message<Foo>> receiveFuture = consumer.receiveAsync();
@@ -126,39 +142,49 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport {
String subscription = topic + "-sub";
String msgPayload = topic + "-msg";
MessageSpecBuilderCustomizer<String> messageCustomizer = null;
if (testArgs.useMessageCustomizer) {
if (testArgs.messageCustomizer) {
messageCustomizer = (mb) -> mb.key("foo-key");
}
ReactiveMessageSenderBuilderCustomizer<String> senderCustomizer = null;
if (testArgs.useSenderCustomizer) {
senderCustomizer = (sb) -> sb.producerName("foo-producer");
if (testArgs.senderCustomizer) {
senderCustomizer = (sb) -> sb.producerName("foo-sender");
}
try (PulsarClient client = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl())
.build()) {
try (Consumer<String> consumer = client.newConsumer(Schema.STRING).topic(topic)
.subscriptionName(subscription).subscribe()) {
MutableReactiveMessageSenderSpec senderSpec = new MutableReactiveMessageSenderSpec();
if (!testArgs.useSpecificTopic) {
if (!testArgs.explicitTopic) {
senderSpec.setTopicName(topic);
}
ReactivePulsarSenderFactory<String> senderFactory = new DefaultReactivePulsarSenderFactory<>(client,
senderSpec, null);
org.springframework.pulsar.reactive.core.ReactivePulsarTemplate<String> pulsarTemplate = new org.springframework.pulsar.reactive.core.ReactivePulsarTemplate<>(
senderFactory);
ReactivePulsarTemplate<String> pulsarTemplate = new ReactivePulsarTemplate<>(senderFactory);
Mono<MessageId> sendResponse;
if (testArgs.useTemplateSchema) {
pulsarTemplate.setSchema(Schema.STRING);
}
if (testArgs.useSimpleApi) {
sendResponse = testArgs.useSpecificTopic ? pulsarTemplate.send(topic, msgPayload)
: pulsarTemplate.send(msgPayload);
if (testArgs.simpleApi) {
if (testArgs.explicitSchema && testArgs.explicitTopic) {
sendResponse = pulsarTemplate.send(topic, msgPayload, Schema.STRING);
}
else if (testArgs.explicitSchema) {
sendResponse = pulsarTemplate.send(msgPayload, Schema.STRING);
}
else if (testArgs.explicitTopic) {
sendResponse = pulsarTemplate.send(topic, msgPayload);
}
else {
sendResponse = pulsarTemplate.send(msgPayload);
}
}
else {
ReactivePulsarTemplate.SendMessageBuilderImpl<String> messageBuilder = pulsarTemplate
.newMessage(msgPayload);
if (testArgs.useSpecificTopic) {
if (testArgs.explicitTopic) {
messageBuilder = messageBuilder.withTopic(topic);
}
if (testArgs.explicitSchema) {
messageBuilder = messageBuilder.withSchema(Schema.STRING);
}
if (messageCustomizer != null) {
messageBuilder = messageBuilder.withMessageCustomizer(messageCustomizer);
}
@@ -177,7 +203,7 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport {
assertThat(msg.getKey()).isEqualTo("foo-key");
}
if (senderCustomizer != null) {
assertThat(msg.getProducerName()).isEqualTo("foo-producer");
assertThat(msg.getProducerName()).isEqualTo("foo-sender");
}
// Make sure the producer was closed by the template (albeit indirectly as
// client removes closed producers)
@@ -188,67 +214,61 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport {
}
private static Stream<Arguments> sendMessageTestProvider() {
return Stream.of(arguments("sendReactiveMessageToDefaultTopic", SendTestArgs.useSpecificTopic(false)),
arguments("sendReactiveMessageToDefaultTopicWithSimpleApi",
SendTestArgs.useSpecificTopic(false).useSimpleApi()),
arguments("sendReactiveMessageToDefaultTopicWithSimpleApiAndTemplateSchema",
SendTestArgs.useSpecificTopic(false).useSimpleApi().useTemplateSchema()),
arguments("sendReactiveMessageToDefaultTopicWithMessageCustomizer",
SendTestArgs.useSpecificTopic(false).useMessageCustomizer()),
arguments("sendReactiveMessageToDefaultTopicWithProducerCustomizer",
SendTestArgs.useSpecificTopic(false).useSenderCustomizer()),
arguments("sendReactiveMessageToDefaultTopicWithAllOptions",
SendTestArgs.useSpecificTopic(false).useMessageCustomizer().useSenderCustomizer()),
arguments("sendReactiveMessageToSpecificTopic", SendTestArgs.useSpecificTopic(true)),
arguments("sendReactiveMessageToSpecificTopicWithSimpleApi",
SendTestArgs.useSpecificTopic(true).useSimpleApi()),
arguments("sendReactiveMessageToSpecificTopicWithSimpleApiAndTemplateSchema",
SendTestArgs.useSpecificTopic(true).useSimpleApi().useTemplateSchema()),
arguments("sendReactiveMessageToSpecificTopicWithMessageCustomizer",
SendTestArgs.useSpecificTopic(true).useMessageCustomizer()),
arguments("sendReactiveMessageToSpecificTopicWithProducerCustomizer",
SendTestArgs.useSpecificTopic(true).useSenderCustomizer()),
arguments("sendReactiveMessageToSpecificTopicWithAllOptions",
SendTestArgs.useSpecificTopic(true).useMessageCustomizer().useSenderCustomizer()));
return Stream.of(arguments("simpleReactiveSend", SendTestArgs.simple()),
arguments("simpleReactiveSendWithTopic", SendTestArgs.simple().topic()),
arguments("simpleReactiveSendWithSchema", SendTestArgs.simple().schema()),
arguments("simpleReactiveSendWithTopicAndSchema", SendTestArgs.simple().topic().schema()),
arguments("fluentReactiveSend", SendTestArgs.fluent()),
arguments("fluentReactiveSendWithSchema", SendTestArgs.fluent().schema()),
arguments("fluentReactiveSendWithTopic", SendTestArgs.fluent().topic()),
arguments("fluentReactiveSendWithMessageCustomizer", SendTestArgs.fluent().messageCustomizer()),
arguments("fluentReactiveSendWithSenderCustomizer", SendTestArgs.fluent().senderCustomizer()),
arguments("fluentReactiveSendWithTopicAndSchema", SendTestArgs.fluent().topic().schema()),
arguments("fluentReactiveSendWithTopicAndSchemaAndCustomizers",
SendTestArgs.fluent().topic().schema().messageCustomizer().senderCustomizer()));
}
static final class SendTestArgs {
private final boolean useSpecificTopic;
private boolean simpleApi;
private boolean useMessageCustomizer;
private boolean explicitTopic;
private boolean useSenderCustomizer;
private boolean explicitSchema;
private boolean useSimpleApi;
private boolean messageCustomizer;
private boolean useTemplateSchema;
private boolean senderCustomizer;
private SendTestArgs(boolean useSpecificTopic) {
this.useSpecificTopic = useSpecificTopic;
private SendTestArgs(boolean simpleApi) {
this.simpleApi = simpleApi;
}
static SendTestArgs useSpecificTopic(boolean useSpecificTopic) {
return new SendTestArgs(useSpecificTopic);
static SendTestArgs simple() {
return new SendTestArgs(true);
}
SendTestArgs useMessageCustomizer() {
this.useMessageCustomizer = true;
static SendTestArgs fluent() {
return new SendTestArgs(false);
}
SendTestArgs topic() {
this.explicitTopic = true;
return this;
}
SendTestArgs useSenderCustomizer() {
this.useSenderCustomizer = true;
SendTestArgs schema() {
this.explicitSchema = true;
return this;
}
SendTestArgs useSimpleApi() {
this.useSimpleApi = true;
SendTestArgs messageCustomizer() {
this.messageCustomizer = true;
return this;
}
SendTestArgs useTemplateSchema() {
this.useTemplateSchema = true;
SendTestArgs senderCustomizer() {
this.senderCustomizer = true;
return this;
}

View File

@@ -87,6 +87,7 @@ import reactor.core.publisher.Mono;
* Tests for {@link ReactivePulsarListener} annotation.
*
* @author Christophe Bornet
* @author Chris Bono
*/
@SpringJUnitConfig
@DirtiesContext
@@ -199,13 +200,13 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport {
@ReactivePulsarListener(id = "id-1", topics = "topic-1", subscriptionName = "subscription-1",
consumerCustomizer = "consumerCustomizer")
Mono<Void> listen1(String message) {
Mono<Void> listen1(String ignored) {
latch1.countDown();
return Mono.empty();
}
@ReactivePulsarListener(consumerCustomizer = "listen2Customizer")
Mono<Void> listen2(String message) {
Mono<Void> listen2(String ignored) {
latch2.countDown();
return Mono.empty();
}
@@ -218,7 +219,7 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport {
@ReactivePulsarListener(id = "id-3", topicPattern = "persistent://public/default/pattern.*",
subscriptionName = "subscription-3", consumerCustomizer = "consumerCustomizer")
Mono<Void> listen3(String message) {
Mono<Void> listen3(String ignored) {
latch3.countDown();
return Mono.empty();
}
@@ -350,10 +351,8 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport {
PulsarProducerFactory<User> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
Collections.emptyMap());
PulsarTemplate<User> template = new PulsarTemplate<>(pulsarProducerFactory);
template.setSchema(JSONSchema.of(User.class));
for (int i = 0; i < 3; i++) {
template.send("json-topic", new User("Jason", i));
template.send("json-topic", new User("Jason", i), JSONSchema.of(User.class));
}
assertThat(jsonLatch.await(10, TimeUnit.SECONDS)).isTrue();
}
@@ -363,10 +362,8 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport {
PulsarProducerFactory<User> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
Collections.emptyMap());
PulsarTemplate<User> template = new PulsarTemplate<>(pulsarProducerFactory);
template.setSchema(AvroSchema.of(User.class));
for (int i = 0; i < 3; i++) {
template.send("avro-topic", new User("Avi", i));
template.send("avro-topic", new User("Avi", i), AvroSchema.of(User.class));
}
assertThat(avroLatch.await(10, TimeUnit.SECONDS)).isTrue();
}
@@ -378,10 +375,8 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport {
PulsarTemplate<KeyValue<String, Integer>> template = new PulsarTemplate<>(pulsarProducerFactory);
Schema<KeyValue<String, Integer>> kvSchema = Schema.KeyValue(Schema.STRING, Schema.INT32,
KeyValueEncodingType.INLINE);
template.setSchema(kvSchema);
for (int i = 0; i < 3; i++) {
template.send("keyvalue-topic", new KeyValue<>("Kevin", i));
template.send("keyvalue-topic", new KeyValue<>("Kevin", i), kvSchema);
}
assertThat(keyvalueLatch.await(10, TimeUnit.SECONDS)).isTrue();
}
@@ -391,10 +386,9 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport {
PulsarProducerFactory<Proto.Person> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
Collections.emptyMap());
PulsarTemplate<Proto.Person> template = new PulsarTemplate<>(pulsarProducerFactory);
template.setSchema(ProtobufSchema.of(Proto.Person.class));
for (int i = 0; i < 3; i++) {
template.send("protobuf-topic", Proto.Person.newBuilder().setId(i).setName("Paul").build());
template.send("protobuf-topic", Proto.Person.newBuilder().setId(i).setName("Paul").build(),
ProtobufSchema.of(Proto.Person.class));
}
assertThat(protobufLatch.await(10, TimeUnit.SECONDS)).isTrue();
}
@@ -405,28 +399,28 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport {
@ReactivePulsarListener(id = "jsonListener", topics = "json-topic", schemaType = SchemaType.JSON,
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenJson(User message) {
Mono<Void> listenJson(User ignored) {
jsonLatch.countDown();
return Mono.empty();
}
@ReactivePulsarListener(id = "avroListener", topics = "avro-topic", schemaType = SchemaType.AVRO,
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenAvro(User message) {
Mono<Void> listenAvro(User ignored) {
avroLatch.countDown();
return Mono.empty();
}
@ReactivePulsarListener(id = "keyvalueListener", topics = "keyvalue-topic",
schemaType = SchemaType.KEY_VALUE, consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenKeyvalue(KeyValue<String, Integer> message) {
Mono<Void> listenKeyvalue(KeyValue<String, Integer> ignored) {
keyvalueLatch.countDown();
return Mono.empty();
}
@ReactivePulsarListener(id = "protobufListener", topics = "protobuf-topic",
schemaType = SchemaType.PROTOBUF, consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenProtobuf(Proto.Person message) {
Mono<Void> listenProtobuf(Proto.Person ignored) {
protobufLatch.countDown();
return Mono.empty();
}
@@ -509,10 +503,8 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport {
PulsarProducerFactory<User2> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
Collections.emptyMap());
PulsarTemplate<User2> template = new PulsarTemplate<>(pulsarProducerFactory);
template.setSchema(JSONSchema.of(User2.class));
for (int i = 0; i < 3; i++) {
template.send("json-custom-schema-topic", new User2("Jason", i));
template.send("json-custom-schema-topic", new User2("Jason", i), JSONSchema.of(User2.class));
}
assertThat(jsonLatch.await(10, TimeUnit.SECONDS)).isTrue();
}
@@ -522,10 +514,8 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport {
PulsarProducerFactory<User> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
Collections.emptyMap());
PulsarTemplate<User> template = new PulsarTemplate<>(pulsarProducerFactory);
template.setSchema(AvroSchema.of(User.class));
for (int i = 0; i < 3; i++) {
template.send("avro-custom-schema-topic", new User("Avi", i));
template.send("avro-custom-schema-topic", new User("Avi", i), AvroSchema.of(User.class));
}
assertThat(avroLatch.await(10, TimeUnit.SECONDS)).isTrue();
}
@@ -537,10 +527,9 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport {
PulsarTemplate<KeyValue<String, User2>> template = new PulsarTemplate<>(pulsarProducerFactory);
Schema<KeyValue<String, User2>> kvSchema = Schema.KeyValue(Schema.STRING, Schema.JSON(User2.class),
KeyValueEncodingType.INLINE);
template.setSchema(kvSchema);
for (int i = 0; i < 3; i++) {
template.send("keyvalue-custom-schema-topic", new KeyValue<>("Kevin", new User2("Kevin", 5150)));
template.send("keyvalue-custom-schema-topic", new KeyValue<>("Kevin", new User2("Kevin", 5150)),
kvSchema);
}
assertThat(keyvalueLatch.await(10, TimeUnit.SECONDS)).isTrue();
}
@@ -550,11 +539,10 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport {
PulsarProducerFactory<Proto.Person> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
Collections.emptyMap());
PulsarTemplate<Proto.Person> template = new PulsarTemplate<>(pulsarProducerFactory);
template.setSchema(ProtobufSchema.of(Proto.Person.class));
for (int i = 0; i < 3; i++) {
template.send("protobuf-custom-schema-topic",
Proto.Person.newBuilder().setId(i).setName("Paul").build());
Proto.Person.newBuilder().setId(i).setName("Paul").build(),
ProtobufSchema.of(Proto.Person.class));
}
assertThat(protobufLatch.await(10, TimeUnit.SECONDS)).isTrue();
}
@@ -582,28 +570,28 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport {
@ReactivePulsarListener(id = "jsonListener", topics = "json-custom-schema-topic",
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenJson(User2 message) {
Mono<Void> listenJson(User2 ignored) {
jsonLatch.countDown();
return Mono.empty();
}
@ReactivePulsarListener(id = "avroListener", topics = "avro-custom-schema-topic",
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenAvro(User message) {
Mono<Void> listenAvro(User ignored) {
avroLatch.countDown();
return Mono.empty();
}
@ReactivePulsarListener(id = "keyvalueListener", topics = "keyvalue-custom-schema-topic",
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenKeyvalue(KeyValue<String, User2> message) {
Mono<Void> listenKeyvalue(KeyValue<String, User2> ignored) {
keyvalueLatch.countDown();
return Mono.empty();
}
@ReactivePulsarListener(id = "protobufListener", topics = "protobuf-custom-schema-topic",
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenProtobuf(Proto.Person message) {
Mono<Void> listenProtobuf(Proto.Person ignored) {
protobufLatch.countDown();
return Mono.empty();
}
@@ -827,7 +815,7 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport {
@ReactivePulsarListener(topics = "pulsarListenerConcurrency", consumerCustomizer = "consumerCustomizer",
concurrency = "100")
Mono<Void> listen1(String message) {
Mono<Void> listen1(String ignored) {
latch.countDown();
// if messages are not handled concurrently, this will make the latch
// await timeout.