diff --git a/spring-pulsar-docs/src/main/asciidoc/schema-info/schema-info-template.adoc b/spring-pulsar-docs/src/main/asciidoc/schema-info/schema-info-template.adoc index 736c4abe..273a6a94 100644 --- a/spring-pulsar-docs/src/main/asciidoc/schema-info/schema-info-template.adoc +++ b/spring-pulsar-docs/src/main/asciidoc/schema-info/schema-info-template.adoc @@ -1,19 +1,12 @@ == Specifying Schema Information If you use Java primitive types, the framework auto-detects the schema for you, and you need not specify any schema types for publishing the data. -However, if you use any complex types (such as `JSON`, `AVRO`, `PROTOBUF`, and others), you need to set the proper schema type on the `{template-class}` before invoking any send operations, as the following example shows for JSON: - -==== -[source, java] ----- -template.setSchema(Schema.JSON(Foo.class)); ----- -==== +However, if you use any complex types (such as `JSON`, `AVRO`, `PROTOBUF`, and others), you need to specify the proper schema when invoking send operations on the `{template-class}`. IMPORTANT: Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, and KEY_VALUE w/ INLINE encoding. === Custom Schema Mapping -As an alternative to specifying the schema on the `{template-class}` for complex types, the schema resolver can be configured with mappings for the types. -This removes the need to set the schema on the template as the framework consults the resolver using the outgoing message type. +As an alternative to specifying the schema when invoking send operations on the `{template-class}` for complex types, the schema resolver can be configured with mappings for the types. +This removes the need to specify the schema as the framework consults the resolver using the outgoing message type. The following example shows a schema resolver customizer that adds mappings for the `User` and `Address` complex objects using `AVRO` and `JSON` schemas, respectively: @@ -29,11 +22,4 @@ public SchemaResolverCustomizer schemaResolverCustomizer( } ---- ==== -With this configuration in place, there is no need to set the schema on the template, for example: -==== -[source, java] ----- -template.send("user-topic", someUserObject); -template.send("address-topic", someAddressObject); ----- -==== +With this configuration in place, there is no need to set specify the schema on send operations. diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactiveMessageSenderUtils.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactiveMessageSenderUtils.java index 49af1609..5957fa0a 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactiveMessageSenderUtils.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactiveMessageSenderUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import java.util.Optional; import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec; +import org.springframework.lang.Nullable; import org.springframework.util.StringUtils; /** @@ -32,7 +33,7 @@ final class ReactiveMessageSenderUtils { private ReactiveMessageSenderUtils() { } - static String resolveTopicName(String userSpecifiedTopic, + static String resolveTopicName(@Nullable String userSpecifiedTopic, ReactivePulsarSenderFactory reactiveMessageSenderFactory) { ReactiveMessageSenderSpec reactiveMessageSenderSpec = reactiveMessageSenderFactory .getReactiveMessageSenderSpec(); diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarOperations.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarOperations.java index 8e1b0687..3751485a 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarOperations.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarOperations.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,11 @@ package org.springframework.pulsar.reactive.core; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Schema; import org.reactivestreams.Publisher; +import org.springframework.lang.Nullable; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -27,6 +30,7 @@ import reactor.core.publisher.Mono; * * @param the message payload type * @author Christophe Bornet + * @author Chris Bono */ public interface ReactivePulsarOperations { @@ -37,6 +41,15 @@ public interface ReactivePulsarOperations { */ Mono send(T message); + /** + * Sends a message to the specified topic in a reactive manner. default topic + * @param message the message to send + * @param schema the schema to use or {@code null} to use the default schema + * resolution + * @return the id assigned by the broker to the published message + */ + Mono send(T message, @Nullable Schema schema); + /** * Sends a message to the specified topic in a reactive manner. * @param topic the topic to send the message to or {@code null} to send to the @@ -44,7 +57,18 @@ public interface ReactivePulsarOperations { * @param message the message to send * @return the id assigned by the broker to the published message */ - Mono send(String topic, T message); + Mono send(@Nullable String topic, T message); + + /** + * Sends a message to the specified topic in a reactive manner. + * @param topic the topic to send the message to or {@code null} to send to the + * default topic + * @param message the message to send + * @param schema the schema to use or {@code null} to use the default schema + * resolution + * @return the id assigned by the broker to the published message + */ + Mono send(@Nullable String topic, T message, @Nullable Schema schema); /** * Sends multiple messages to the default topic in a reactive manner. @@ -54,6 +78,16 @@ public interface ReactivePulsarOperations { */ Flux send(Publisher messages); + /** + * Sends multiple messages to the default topic in a reactive manner. + * @param messages the messages to send + * @param schema the schema to use or {@code null} to use the default schema + * resolution + * @return the ids assigned by the broker to the published messages in the same order + * as they were sent + */ + Flux send(Publisher messages, @Nullable Schema schema); + /** * Sends multiple messages to the specified topic in a reactive manner. * @param topic the topic to send the message to or {@code null} to send to the @@ -62,7 +96,19 @@ public interface ReactivePulsarOperations { * @return the ids assigned by the broker to the published messages in the same order * as they were sent */ - Flux send(String topic, Publisher messages); + Flux send(@Nullable String topic, Publisher messages); + + /** + * Sends multiple messages to the specified topic in a reactive manner. + * @param topic the topic to send the message to or {@code null} to send to the + * default topic + * @param messages the messages to send + * @param schema the schema to use or {@code null} to use the default schema + * resolution + * @return the ids assigned by the broker to the published messages in the same order + * as they were sent + */ + Flux send(@Nullable String topic, Publisher messages, @Nullable Schema schema); /** * Create a {@link SendMessageBuilder builder} for configuring and sending a message @@ -72,6 +118,14 @@ public interface ReactivePulsarOperations { */ SendMessageBuilder newMessage(T message); + /** + * Create a {@link SendMessageBuilder builder} for configuring and sending multiple + * messages reactively. + * @param messages the messages to send + * @return the builder to configure and send the message + */ + SendMessageBuilder newMessages(Publisher messages); + /** * Builder that can be used to configure and send a message. Provides more options * than the send methods provided by {@link ReactivePulsarOperations}. @@ -87,6 +141,13 @@ public interface ReactivePulsarOperations { */ SendMessageBuilder withTopic(String topic); + /** + * Specify the schema to use when sending the message. + * @param schema the schema to use + * @return the current builder with the schema specified + */ + SendMessageBuilder withSchema(Schema schema); + /** * Specifies the message customizer to use to further configure the message. * @param customizer the message customizer @@ -109,6 +170,13 @@ public interface ReactivePulsarOperations { */ Mono send(); + /** + * Send the messages in a reactive manner using the configured specification. + * @return the ids assigned by the broker to the published messages in the same + * order as they were sent + */ + Flux sendMany(); + } } diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplate.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplate.java index 97e35953..0311c10e 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplate.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplate.java @@ -34,7 +34,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * A thread-safe template for executing high-level reactive Pulsar operations. + * A template for executing high-level reactive Pulsar operations. * * @param the message payload type * @author Christophe Bornet @@ -47,9 +47,6 @@ public class ReactivePulsarTemplate implements ReactivePulsarOperations { private final SchemaResolver schemaResolver; - @Nullable - private Schema schema; - /** * Construct a template instance that uses the default schema resolver. * @param reactiveMessageSenderFactory the factory used to create the backing Pulsar @@ -76,8 +73,18 @@ public class ReactivePulsarTemplate implements ReactivePulsarOperations { } @Override - public Mono send(String topic, T message) { - return doSend(topic, message, null, null); + public Mono send(T message, @Nullable Schema schema) { + return doSend(null, message, schema, null, null); + } + + @Override + public Mono send(@Nullable String topic, T message) { + return doSend(topic, message, null, null, null); + } + + @Override + public Mono send(@Nullable String topic, T message, @Nullable Schema schema) { + return doSend(topic, message, schema, null, null); } @Override @@ -86,8 +93,18 @@ public class ReactivePulsarTemplate implements ReactivePulsarOperations { } @Override - public Flux send(String topic, Publisher messages) { - return doSendMany(topic, Flux.from(messages)); + public Flux send(Publisher messages, @Nullable Schema schema) { + return doSendMany(null, Flux.from(messages), schema); + } + + @Override + public Flux send(@Nullable String topic, Publisher messages) { + return doSendMany(topic, Flux.from(messages), null); + } + + @Override + public Flux send(@Nullable String topic, Publisher messages, @Nullable Schema schema) { + return doSendMany(topic, Flux.from(messages), schema); } @Override @@ -95,62 +112,64 @@ public class ReactivePulsarTemplate implements ReactivePulsarOperations { return new SendMessageBuilderImpl<>(this, message); } - /** - * Set the schema to use on this template. - * @param schema provides the {@link Schema} used on this template - */ - public void setSchema(Schema schema) { - this.schema = schema; + @Override + public SendMessageBuilder newMessages(Publisher messages) { + return new SendMessageBuilderImpl<>(this, messages); } - private Mono doSend(String topic, T message, - MessageSpecBuilderCustomizer messageSpecBuilderCustomizer, - ReactiveMessageSenderBuilderCustomizer customizer) { + private Mono doSend(@Nullable String topic, T message, @Nullable Schema schema, + @Nullable MessageSpecBuilderCustomizer messageSpecBuilderCustomizer, + @Nullable ReactiveMessageSenderBuilderCustomizer customizer) { String topicName = ReactiveMessageSenderUtils.resolveTopicName(topic, this.reactiveMessageSenderFactory); this.logger.trace(() -> String.format("Sending reactive message to '%s' topic", topicName)); - ReactiveMessageSender sender = createMessageSender(topic, message, customizer); - return sender.sendOne(getMessageSpec(messageSpecBuilderCustomizer, message)).doOnError( - ex -> this.logger.error(ex, () -> String.format("Failed to send message to '%s' topic", topicName))) + // NOTE: We do not pass the resolved topic name from above as it handles the + // resolve itself + ReactiveMessageSender sender = createMessageSender(topic, message, schema, customizer); + // @formatter:off + return sender.sendOne(getMessageSpec(messageSpecBuilderCustomizer, message)) + .doOnError(ex -> this.logger.error(ex, () -> String.format("Failed to send message to '%s' topic", topicName))) .doOnSuccess(msgId -> this.logger.trace(() -> String.format("Sent message to '%s' topic", topicName))); - + // @formatter:on } - private Flux doSendMany(String topic, Flux messages) { + private Flux doSendMany(@Nullable String topic, Flux messages, @Nullable Schema schema) { String topicName = ReactiveMessageSenderUtils.resolveTopicName(topic, this.reactiveMessageSenderFactory); this.logger.trace(() -> String.format("Sending reactive messages to '%s' topic", topicName)); - if (this.schema != null) { + if (schema != null) { /* - * If the template has a schema, we can create the message sender right away - * and use ReactiveMessageSender::sendMany to send them as a stream. - * Otherwise, we need to wait to get a message to create it and we can't share - * it between messages. So we create one each time and use + * If schema specified we can create the message sender right away and use + * ReactiveMessageSender::sendMany to send them as a stream. Otherwise, we + * need to wait to get a message to create the sender, and we can't share it + * between messages. So we create one each time and use * ReactiveMessageSender::sendOne to send messages individually. */ - ReactiveMessageSender sender = createMessageSender(topic, null, null); + // NOTE: We do not pass the resolved topic name from above as it handles the + // resolve itself + ReactiveMessageSender sender = createMessageSender(topic, null, schema, null); return messages.map(MessageSpec::of).as(sender::sendMany) .doOnError(ex -> this.logger.error(ex, () -> String.format("Failed to send messages to '%s' topic", topicName))) .doOnNext( msgId -> this.logger.trace(() -> String.format("Sent messages to '%s' topic", topicName))); } - return messages.flatMapSequential(message -> doSend(topic, message, null, null)); + return messages.flatMapSequential(message -> doSend(topic, message, schema, null, null)); } - private static MessageSpec getMessageSpec(MessageSpecBuilderCustomizer messageSpecBuilderCustomizer, - T message) { + private static MessageSpec getMessageSpec( + @Nullable MessageSpecBuilderCustomizer messageSpecBuilderCustomizer, T message) { MessageSpecBuilder messageSpecBuilder = MessageSpec.builder(message); - if (messageSpecBuilderCustomizer != null) { messageSpecBuilderCustomizer.customize(messageSpecBuilder); } - return messageSpecBuilder.build(); } - private ReactiveMessageSender createMessageSender(String topic, T message, - ReactiveMessageSenderBuilderCustomizer customizer) { - Schema schema = this.schema != null ? this.schema : this.schemaResolver.getSchema(message); + private ReactiveMessageSender createMessageSender(@Nullable String topic, T message, @Nullable Schema schema, + @Nullable ReactiveMessageSenderBuilderCustomizer customizer) { + if (schema == null) { + schema = this.schemaResolver.getSchema(message); + } return this.reactiveMessageSenderFactory.createSender(topic, schema, customizer == null ? Collections.emptyList() : Collections.singletonList(customizer)); } @@ -161,8 +180,12 @@ public class ReactivePulsarTemplate implements ReactivePulsarOperations { private final T message; + private final Publisher messages; + private String topic; + private Schema schema; + private MessageSpecBuilderCustomizer messageCustomizer; private ReactiveMessageSenderBuilderCustomizer senderCustomizer; @@ -170,6 +193,13 @@ public class ReactivePulsarTemplate implements ReactivePulsarOperations { SendMessageBuilderImpl(ReactivePulsarTemplate template, T message) { this.template = template; this.message = message; + this.messages = null; + } + + SendMessageBuilderImpl(ReactivePulsarTemplate template, Publisher messages) { + this.template = template; + this.message = null; + this.messages = messages; } @Override @@ -178,6 +208,12 @@ public class ReactivePulsarTemplate implements ReactivePulsarOperations { return this; } + @Override + public SendMessageBuilderImpl withSchema(Schema schema) { + this.schema = schema; + return this; + } + @Override public SendMessageBuilderImpl withMessageCustomizer(MessageSpecBuilderCustomizer messageCustomizer) { this.messageCustomizer = messageCustomizer; @@ -193,7 +229,13 @@ public class ReactivePulsarTemplate implements ReactivePulsarOperations { @Override public Mono send() { - return this.template.doSend(this.topic, this.message, this.messageCustomizer, this.senderCustomizer); + return this.template.doSend(this.topic, this.message, this.schema, this.messageCustomizer, + this.senderCustomizer); + } + + @Override + public Flux sendMany() { + return this.template.doSendMany(this.topic, Flux.from(this.messages), this.schema); } } diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java index 76fa38e1..745899d5 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java @@ -35,10 +35,10 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.reactive.client.api.MutableReactiveMessageSenderSpec; import org.assertj.core.api.InstanceOfAssertFactories; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import org.springframework.pulsar.core.DefaultSchemaResolver; import org.springframework.pulsar.core.PulsarTestContainerSupport; @@ -50,29 +50,37 @@ import reactor.core.publisher.Mono; * Tests for {@link org.springframework.pulsar.reactive.core.ReactivePulsarTemplate}. * * @author Christophe Bornet + * @author Chris Bono */ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport { - @Test - void sendMessagesWithSpecificSchema() throws Exception { - String topic = "smt-specific-schema-reactive-topic"; + @ParameterizedTest + @ValueSource(booleans = { true, false }) + void sendMessagesWithSpecificSchema(boolean useSimpleApi) throws Exception { + String topic = "rptt-sendMessagesWithSpecificSchema-" + useSimpleApi + "-topic"; + String sub = "rptt-sendMessagesWithSpecificSchema-" + useSimpleApi + "-sub"; try (PulsarClient client = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) .build()) { - try (Consumer consumer = client.newConsumer(Schema.JSON(Foo.class)).topic(topic) - .subscriptionName("smt-specific-schema-reactive-sub").subscribe()) { + try (Consumer consumer = client.newConsumer(Schema.JSON(Foo.class)).topic(topic).subscriptionName(sub) + .subscribe()) { MutableReactiveMessageSenderSpec senderSpec = new MutableReactiveMessageSenderSpec(); senderSpec.setTopicName(topic); - org.springframework.pulsar.reactive.core.ReactivePulsarSenderFactory producerFactory = new org.springframework.pulsar.reactive.core.DefaultReactivePulsarSenderFactory<>( - client, senderSpec, null); - org.springframework.pulsar.reactive.core.ReactivePulsarTemplate pulsarTemplate = new org.springframework.pulsar.reactive.core.ReactivePulsarTemplate<>( - producerFactory); - pulsarTemplate.setSchema(Schema.JSON(Foo.class)); + ReactivePulsarSenderFactory producerFactory = new DefaultReactivePulsarSenderFactory<>(client, + senderSpec, null); + ReactivePulsarTemplate pulsarTemplate = new ReactivePulsarTemplate<>(producerFactory); List foos = new ArrayList<>(); for (int i = 0; i < 10; i++) { foos.add(new Foo("Foo-" + UUID.randomUUID(), "Bar-" + UUID.randomUUID())); } - pulsarTemplate.send(Flux.fromIterable(foos)).subscribe(); + + if (useSimpleApi) { + pulsarTemplate.send(Flux.fromIterable(foos), Schema.JSON(Foo.class)).subscribe(); + } + else { + pulsarTemplate.newMessages(Flux.fromIterable(foos)).withSchema(Schema.JSON(Foo.class)).sendMany() + .subscribe(); + } for (int i = 0; i < 10; i++) { assertThat(consumer.receiveAsync()).succeedsWithin(Duration.ofSeconds(3)) @@ -82,31 +90,39 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport { } } - @Test - void sendMessagesWithSpecificSchemaAndCustomTypeMappings() throws Exception { - String topic = "smt-specific-schema-custom-reactive-topic"; + @ParameterizedTest + @ValueSource(booleans = { true, false }) + void sendMessagesWithInferredSchema(boolean useSimpleApi) throws Exception { + String topic = "rptt-sendMessagesWithInferredSchema-" + useSimpleApi + "-topic"; + String sub = "rptt-sendMessagesWithInferredSchema-" + useSimpleApi + "-sub"; try (PulsarClient client = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) .build()) { - try (Consumer consumer = client.newConsumer(Schema.JSON(Foo.class)).topic(topic) - .subscriptionName("smt-specific-schema-custom-reactive-sub").subscribe()) { + try (Consumer consumer = client.newConsumer(Schema.JSON(Foo.class)).topic(topic).subscriptionName(sub) + .subscribe()) { MutableReactiveMessageSenderSpec senderSpec = new MutableReactiveMessageSenderSpec(); senderSpec.setTopicName(topic); - org.springframework.pulsar.reactive.core.ReactivePulsarSenderFactory producerFactory = new org.springframework.pulsar.reactive.core.DefaultReactivePulsarSenderFactory<>( - client, senderSpec, null); - // Custom schema resolver allows not calling setSchema on template + ReactivePulsarSenderFactory producerFactory = new DefaultReactivePulsarSenderFactory<>(client, + senderSpec, null); + // Custom schema resolver allows not specifying the schema when sending DefaultSchemaResolver schemaResolver = new DefaultSchemaResolver(); schemaResolver.addCustomSchemaMapping(Foo.class, Schema.JSON(Foo.class)); - org.springframework.pulsar.reactive.core.ReactivePulsarTemplate pulsarTemplate = new org.springframework.pulsar.reactive.core.ReactivePulsarTemplate<>( - producerFactory, schemaResolver); + ReactivePulsarTemplate pulsarTemplate = new ReactivePulsarTemplate<>(producerFactory, + schemaResolver); List foos = new ArrayList<>(); for (int i = 0; i < 10; i++) { foos.add(new Foo("Foo-" + UUID.randomUUID(), "Bar-" + UUID.randomUUID())); } - pulsarTemplate.send(Flux.fromIterable(foos)).subscribe(); - // TODO figure out why ordering is not preserved when template does not - // have schema set + if (useSimpleApi) { + pulsarTemplate.send(Flux.fromIterable(foos)).subscribe(); + } + else { + pulsarTemplate.newMessages(Flux.fromIterable(foos)).sendMany().subscribe(); + } + + // TODO figure out if expected to not be ordered when schema not set on + // template List foos2 = new ArrayList<>(); for (int i = 0; i < 10; i++) { CompletableFuture> receiveFuture = consumer.receiveAsync(); @@ -126,39 +142,49 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport { String subscription = topic + "-sub"; String msgPayload = topic + "-msg"; MessageSpecBuilderCustomizer messageCustomizer = null; - if (testArgs.useMessageCustomizer) { + if (testArgs.messageCustomizer) { messageCustomizer = (mb) -> mb.key("foo-key"); } ReactiveMessageSenderBuilderCustomizer senderCustomizer = null; - if (testArgs.useSenderCustomizer) { - senderCustomizer = (sb) -> sb.producerName("foo-producer"); + if (testArgs.senderCustomizer) { + senderCustomizer = (sb) -> sb.producerName("foo-sender"); } try (PulsarClient client = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) .build()) { try (Consumer consumer = client.newConsumer(Schema.STRING).topic(topic) .subscriptionName(subscription).subscribe()) { + MutableReactiveMessageSenderSpec senderSpec = new MutableReactiveMessageSenderSpec(); - if (!testArgs.useSpecificTopic) { + if (!testArgs.explicitTopic) { senderSpec.setTopicName(topic); } ReactivePulsarSenderFactory senderFactory = new DefaultReactivePulsarSenderFactory<>(client, senderSpec, null); - org.springframework.pulsar.reactive.core.ReactivePulsarTemplate pulsarTemplate = new org.springframework.pulsar.reactive.core.ReactivePulsarTemplate<>( - senderFactory); + ReactivePulsarTemplate pulsarTemplate = new ReactivePulsarTemplate<>(senderFactory); Mono sendResponse; - if (testArgs.useTemplateSchema) { - pulsarTemplate.setSchema(Schema.STRING); - } - if (testArgs.useSimpleApi) { - sendResponse = testArgs.useSpecificTopic ? pulsarTemplate.send(topic, msgPayload) - : pulsarTemplate.send(msgPayload); + if (testArgs.simpleApi) { + if (testArgs.explicitSchema && testArgs.explicitTopic) { + sendResponse = pulsarTemplate.send(topic, msgPayload, Schema.STRING); + } + else if (testArgs.explicitSchema) { + sendResponse = pulsarTemplate.send(msgPayload, Schema.STRING); + } + else if (testArgs.explicitTopic) { + sendResponse = pulsarTemplate.send(topic, msgPayload); + } + else { + sendResponse = pulsarTemplate.send(msgPayload); + } } else { ReactivePulsarTemplate.SendMessageBuilderImpl messageBuilder = pulsarTemplate .newMessage(msgPayload); - if (testArgs.useSpecificTopic) { + if (testArgs.explicitTopic) { messageBuilder = messageBuilder.withTopic(topic); } + if (testArgs.explicitSchema) { + messageBuilder = messageBuilder.withSchema(Schema.STRING); + } if (messageCustomizer != null) { messageBuilder = messageBuilder.withMessageCustomizer(messageCustomizer); } @@ -177,7 +203,7 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport { assertThat(msg.getKey()).isEqualTo("foo-key"); } if (senderCustomizer != null) { - assertThat(msg.getProducerName()).isEqualTo("foo-producer"); + assertThat(msg.getProducerName()).isEqualTo("foo-sender"); } // Make sure the producer was closed by the template (albeit indirectly as // client removes closed producers) @@ -188,67 +214,61 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport { } private static Stream sendMessageTestProvider() { - return Stream.of(arguments("sendReactiveMessageToDefaultTopic", SendTestArgs.useSpecificTopic(false)), - arguments("sendReactiveMessageToDefaultTopicWithSimpleApi", - SendTestArgs.useSpecificTopic(false).useSimpleApi()), - arguments("sendReactiveMessageToDefaultTopicWithSimpleApiAndTemplateSchema", - SendTestArgs.useSpecificTopic(false).useSimpleApi().useTemplateSchema()), - arguments("sendReactiveMessageToDefaultTopicWithMessageCustomizer", - SendTestArgs.useSpecificTopic(false).useMessageCustomizer()), - arguments("sendReactiveMessageToDefaultTopicWithProducerCustomizer", - SendTestArgs.useSpecificTopic(false).useSenderCustomizer()), - arguments("sendReactiveMessageToDefaultTopicWithAllOptions", - SendTestArgs.useSpecificTopic(false).useMessageCustomizer().useSenderCustomizer()), - arguments("sendReactiveMessageToSpecificTopic", SendTestArgs.useSpecificTopic(true)), - arguments("sendReactiveMessageToSpecificTopicWithSimpleApi", - SendTestArgs.useSpecificTopic(true).useSimpleApi()), - arguments("sendReactiveMessageToSpecificTopicWithSimpleApiAndTemplateSchema", - SendTestArgs.useSpecificTopic(true).useSimpleApi().useTemplateSchema()), - arguments("sendReactiveMessageToSpecificTopicWithMessageCustomizer", - SendTestArgs.useSpecificTopic(true).useMessageCustomizer()), - arguments("sendReactiveMessageToSpecificTopicWithProducerCustomizer", - SendTestArgs.useSpecificTopic(true).useSenderCustomizer()), - arguments("sendReactiveMessageToSpecificTopicWithAllOptions", - SendTestArgs.useSpecificTopic(true).useMessageCustomizer().useSenderCustomizer())); + return Stream.of(arguments("simpleReactiveSend", SendTestArgs.simple()), + arguments("simpleReactiveSendWithTopic", SendTestArgs.simple().topic()), + arguments("simpleReactiveSendWithSchema", SendTestArgs.simple().schema()), + arguments("simpleReactiveSendWithTopicAndSchema", SendTestArgs.simple().topic().schema()), + arguments("fluentReactiveSend", SendTestArgs.fluent()), + arguments("fluentReactiveSendWithSchema", SendTestArgs.fluent().schema()), + arguments("fluentReactiveSendWithTopic", SendTestArgs.fluent().topic()), + arguments("fluentReactiveSendWithMessageCustomizer", SendTestArgs.fluent().messageCustomizer()), + arguments("fluentReactiveSendWithSenderCustomizer", SendTestArgs.fluent().senderCustomizer()), + arguments("fluentReactiveSendWithTopicAndSchema", SendTestArgs.fluent().topic().schema()), + arguments("fluentReactiveSendWithTopicAndSchemaAndCustomizers", + SendTestArgs.fluent().topic().schema().messageCustomizer().senderCustomizer())); } static final class SendTestArgs { - private final boolean useSpecificTopic; + private boolean simpleApi; - private boolean useMessageCustomizer; + private boolean explicitTopic; - private boolean useSenderCustomizer; + private boolean explicitSchema; - private boolean useSimpleApi; + private boolean messageCustomizer; - private boolean useTemplateSchema; + private boolean senderCustomizer; - private SendTestArgs(boolean useSpecificTopic) { - this.useSpecificTopic = useSpecificTopic; + private SendTestArgs(boolean simpleApi) { + this.simpleApi = simpleApi; } - static SendTestArgs useSpecificTopic(boolean useSpecificTopic) { - return new SendTestArgs(useSpecificTopic); + static SendTestArgs simple() { + return new SendTestArgs(true); } - SendTestArgs useMessageCustomizer() { - this.useMessageCustomizer = true; + static SendTestArgs fluent() { + return new SendTestArgs(false); + } + + SendTestArgs topic() { + this.explicitTopic = true; return this; } - SendTestArgs useSenderCustomizer() { - this.useSenderCustomizer = true; + SendTestArgs schema() { + this.explicitSchema = true; return this; } - SendTestArgs useSimpleApi() { - this.useSimpleApi = true; + SendTestArgs messageCustomizer() { + this.messageCustomizer = true; return this; } - SendTestArgs useTemplateSchema() { - this.useTemplateSchema = true; + SendTestArgs senderCustomizer() { + this.senderCustomizer = true; return this; } diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java index 38ca4073..de4350a5 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java @@ -87,6 +87,7 @@ import reactor.core.publisher.Mono; * Tests for {@link ReactivePulsarListener} annotation. * * @author Christophe Bornet + * @author Chris Bono */ @SpringJUnitConfig @DirtiesContext @@ -199,13 +200,13 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport { @ReactivePulsarListener(id = "id-1", topics = "topic-1", subscriptionName = "subscription-1", consumerCustomizer = "consumerCustomizer") - Mono listen1(String message) { + Mono listen1(String ignored) { latch1.countDown(); return Mono.empty(); } @ReactivePulsarListener(consumerCustomizer = "listen2Customizer") - Mono listen2(String message) { + Mono listen2(String ignored) { latch2.countDown(); return Mono.empty(); } @@ -218,7 +219,7 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport { @ReactivePulsarListener(id = "id-3", topicPattern = "persistent://public/default/pattern.*", subscriptionName = "subscription-3", consumerCustomizer = "consumerCustomizer") - Mono listen3(String message) { + Mono listen3(String ignored) { latch3.countDown(); return Mono.empty(); } @@ -350,10 +351,8 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport { PulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Collections.emptyMap()); PulsarTemplate template = new PulsarTemplate<>(pulsarProducerFactory); - template.setSchema(JSONSchema.of(User.class)); - for (int i = 0; i < 3; i++) { - template.send("json-topic", new User("Jason", i)); + template.send("json-topic", new User("Jason", i), JSONSchema.of(User.class)); } assertThat(jsonLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @@ -363,10 +362,8 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport { PulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Collections.emptyMap()); PulsarTemplate template = new PulsarTemplate<>(pulsarProducerFactory); - template.setSchema(AvroSchema.of(User.class)); - for (int i = 0; i < 3; i++) { - template.send("avro-topic", new User("Avi", i)); + template.send("avro-topic", new User("Avi", i), AvroSchema.of(User.class)); } assertThat(avroLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @@ -378,10 +375,8 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport { PulsarTemplate> template = new PulsarTemplate<>(pulsarProducerFactory); Schema> kvSchema = Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE); - template.setSchema(kvSchema); - for (int i = 0; i < 3; i++) { - template.send("keyvalue-topic", new KeyValue<>("Kevin", i)); + template.send("keyvalue-topic", new KeyValue<>("Kevin", i), kvSchema); } assertThat(keyvalueLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @@ -391,10 +386,9 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport { PulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Collections.emptyMap()); PulsarTemplate template = new PulsarTemplate<>(pulsarProducerFactory); - template.setSchema(ProtobufSchema.of(Proto.Person.class)); - for (int i = 0; i < 3; i++) { - template.send("protobuf-topic", Proto.Person.newBuilder().setId(i).setName("Paul").build()); + template.send("protobuf-topic", Proto.Person.newBuilder().setId(i).setName("Paul").build(), + ProtobufSchema.of(Proto.Person.class)); } assertThat(protobufLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @@ -405,28 +399,28 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport { @ReactivePulsarListener(id = "jsonListener", topics = "json-topic", schemaType = SchemaType.JSON, consumerCustomizer = "subscriptionInitialPositionEarliest") - Mono listenJson(User message) { + Mono listenJson(User ignored) { jsonLatch.countDown(); return Mono.empty(); } @ReactivePulsarListener(id = "avroListener", topics = "avro-topic", schemaType = SchemaType.AVRO, consumerCustomizer = "subscriptionInitialPositionEarliest") - Mono listenAvro(User message) { + Mono listenAvro(User ignored) { avroLatch.countDown(); return Mono.empty(); } @ReactivePulsarListener(id = "keyvalueListener", topics = "keyvalue-topic", schemaType = SchemaType.KEY_VALUE, consumerCustomizer = "subscriptionInitialPositionEarliest") - Mono listenKeyvalue(KeyValue message) { + Mono listenKeyvalue(KeyValue ignored) { keyvalueLatch.countDown(); return Mono.empty(); } @ReactivePulsarListener(id = "protobufListener", topics = "protobuf-topic", schemaType = SchemaType.PROTOBUF, consumerCustomizer = "subscriptionInitialPositionEarliest") - Mono listenProtobuf(Proto.Person message) { + Mono listenProtobuf(Proto.Person ignored) { protobufLatch.countDown(); return Mono.empty(); } @@ -509,10 +503,8 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport { PulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Collections.emptyMap()); PulsarTemplate template = new PulsarTemplate<>(pulsarProducerFactory); - template.setSchema(JSONSchema.of(User2.class)); - for (int i = 0; i < 3; i++) { - template.send("json-custom-schema-topic", new User2("Jason", i)); + template.send("json-custom-schema-topic", new User2("Jason", i), JSONSchema.of(User2.class)); } assertThat(jsonLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @@ -522,10 +514,8 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport { PulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Collections.emptyMap()); PulsarTemplate template = new PulsarTemplate<>(pulsarProducerFactory); - template.setSchema(AvroSchema.of(User.class)); - for (int i = 0; i < 3; i++) { - template.send("avro-custom-schema-topic", new User("Avi", i)); + template.send("avro-custom-schema-topic", new User("Avi", i), AvroSchema.of(User.class)); } assertThat(avroLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @@ -537,10 +527,9 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport { PulsarTemplate> template = new PulsarTemplate<>(pulsarProducerFactory); Schema> kvSchema = Schema.KeyValue(Schema.STRING, Schema.JSON(User2.class), KeyValueEncodingType.INLINE); - template.setSchema(kvSchema); - for (int i = 0; i < 3; i++) { - template.send("keyvalue-custom-schema-topic", new KeyValue<>("Kevin", new User2("Kevin", 5150))); + template.send("keyvalue-custom-schema-topic", new KeyValue<>("Kevin", new User2("Kevin", 5150)), + kvSchema); } assertThat(keyvalueLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @@ -550,11 +539,10 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport { PulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Collections.emptyMap()); PulsarTemplate template = new PulsarTemplate<>(pulsarProducerFactory); - template.setSchema(ProtobufSchema.of(Proto.Person.class)); - for (int i = 0; i < 3; i++) { template.send("protobuf-custom-schema-topic", - Proto.Person.newBuilder().setId(i).setName("Paul").build()); + Proto.Person.newBuilder().setId(i).setName("Paul").build(), + ProtobufSchema.of(Proto.Person.class)); } assertThat(protobufLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @@ -582,28 +570,28 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport { @ReactivePulsarListener(id = "jsonListener", topics = "json-custom-schema-topic", consumerCustomizer = "subscriptionInitialPositionEarliest") - Mono listenJson(User2 message) { + Mono listenJson(User2 ignored) { jsonLatch.countDown(); return Mono.empty(); } @ReactivePulsarListener(id = "avroListener", topics = "avro-custom-schema-topic", consumerCustomizer = "subscriptionInitialPositionEarliest") - Mono listenAvro(User message) { + Mono listenAvro(User ignored) { avroLatch.countDown(); return Mono.empty(); } @ReactivePulsarListener(id = "keyvalueListener", topics = "keyvalue-custom-schema-topic", consumerCustomizer = "subscriptionInitialPositionEarliest") - Mono listenKeyvalue(KeyValue message) { + Mono listenKeyvalue(KeyValue ignored) { keyvalueLatch.countDown(); return Mono.empty(); } @ReactivePulsarListener(id = "protobufListener", topics = "protobuf-custom-schema-topic", consumerCustomizer = "subscriptionInitialPositionEarliest") - Mono listenProtobuf(Proto.Person message) { + Mono listenProtobuf(Proto.Person ignored) { protobufLatch.countDown(); return Mono.empty(); } @@ -827,7 +815,7 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport { @ReactivePulsarListener(topics = "pulsarListenerConcurrency", consumerCustomizer = "consumerCustomizer", concurrency = "100") - Mono listen1(String message) { + Mono listen1(String ignored) { latch.countDown(); // if messages are not handled concurrently, this will make the latch // await timeout. diff --git a/spring-pulsar-sample-apps/sample-app1/src/main/java/app1/SpringPulsarBootApp.java b/spring-pulsar-sample-apps/sample-app1/src/main/java/app1/SpringPulsarBootApp.java index 25432b51..8f5361f4 100644 --- a/spring-pulsar-sample-apps/sample-app1/src/main/java/app1/SpringPulsarBootApp.java +++ b/spring-pulsar-sample-apps/sample-app1/src/main/java/app1/SpringPulsarBootApp.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -93,11 +93,10 @@ public class SpringPulsarBootApp { String topic = "hello-pulsar-exclusive-3"; PulsarTemplate pulsarTemplate = new PulsarTemplate<>(producerFactory); - pulsarTemplate.setSchema(Schema.JSON(Foo.class)); return args -> { for (int i = 0; i < 10; i++) { Foo foo = new Foo(i + "-" + "Foo-" + UUID.randomUUID(), i + "-" + "Bar-" + UUID.randomUUID()); - pulsarTemplate.send(topic, foo); + pulsarTemplate.send(topic, foo, Schema.JSON(Foo.class)); } }; } @@ -139,11 +138,10 @@ public class SpringPulsarBootApp { String topic = "hello-pulsar-exclusive-5"; PulsarTemplate pulsarTemplate = new PulsarTemplate<>(producerFactory); - pulsarTemplate.setSchema(Schema.JSON(Foo.class)); return args -> { for (int i = 0; i < 100; i++) { Foo foo = new Foo(i + "-" + "Foo-" + UUID.randomUUID(), i + "-" + "Bar-" + UUID.randomUUID()); - pulsarTemplate.send(topic, foo); + pulsarTemplate.send(topic, foo, Schema.JSON(Foo.class)); } }; } diff --git a/spring-pulsar-sample-apps/sample-reactive/src/main/java/org.springframework.pulsar.example/ReactiveSpringPulsarBootApp.java b/spring-pulsar-sample-apps/sample-reactive/src/main/java/org.springframework.pulsar.example/ReactiveSpringPulsarBootApp.java index 6c53a51e..96281f1b 100644 --- a/spring-pulsar-sample-apps/sample-reactive/src/main/java/org.springframework.pulsar.example/ReactiveSpringPulsarBootApp.java +++ b/spring-pulsar-sample-apps/sample-reactive/src/main/java/org.springframework.pulsar.example/ReactiveSpringPulsarBootApp.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -88,9 +88,10 @@ public class ReactiveSpringPulsarBootApp { @Override public void onApplicationEvent(ApplicationReadyEvent event) { - this.reactivePulsarTemplate.setSchema(Schema.JSON(Foo.class)); + Schema schema = Schema.JSON(Foo.class); Flux.range(0, 10).map((i) -> new Foo("Foo-" + i, "Bar-" + i)) - .as(messages -> this.reactivePulsarTemplate.send("sample-reactive-topic2", messages)).subscribe(); + .as(messages -> this.reactivePulsarTemplate.send("sample-reactive-topic2", messages, schema)) + .subscribe(); } @ReactivePulsarListener(subscriptionName = "sample-reactive-sub2", topics = "sample-reactive-topic2", diff --git a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarListenerTests.java b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarListenerTests.java index 962378bb..89ab0bb4 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarListenerTests.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarListenerTests.java @@ -77,8 +77,7 @@ class PulsarListenerTests implements PulsarTestContainerSupport { .run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) { @SuppressWarnings("unchecked") PulsarTemplate pulsarTemplate = context.getBean(PulsarTemplate.class); - pulsarTemplate.setSchema(Schema.JSON(Foo.class)); - pulsarTemplate.send("plt-custom-topic1", new Foo("John Doe")); + pulsarTemplate.send("plt-custom-topic1", new Foo("John Doe"), Schema.JSON(Foo.class)); assertThat(LATCH_2.await(20, TimeUnit.SECONDS)).isTrue(); } } @@ -118,7 +117,7 @@ class PulsarListenerTests implements PulsarTestContainerSupport { static class BasicListenerConfig { @PulsarListener(subscriptionName = "plt-sub1", topics = "plt-topic1") - public void listen(String foo) { + public void listen(String ignored) { LATCH_1.countDown(); } @@ -130,7 +129,7 @@ class PulsarListenerTests implements PulsarTestContainerSupport { @PulsarListener(subscriptionName = "plt-custom-sub1", topics = "plt-custom-topic1", schemaType = SchemaType.JSON) - public void listen(Foo foo) { + public void listen(Foo ignored) { LATCH_2.countDown(); } @@ -148,7 +147,7 @@ class PulsarListenerTests implements PulsarTestContainerSupport { } @PulsarListener(subscriptionName = "plt-custom-sub2", topics = "plt-custom-topic2") - public void listen(Foo foo) { + public void listen(Foo ignored) { LATCH_3.countDown(); } diff --git a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/ReactivePulsarListenerTests.java b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/ReactivePulsarListenerTests.java index 660fb2ed..6661b50d 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/ReactivePulsarListenerTests.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/ReactivePulsarListenerTests.java @@ -86,8 +86,7 @@ class ReactivePulsarListenerTests implements PulsarTestContainerSupport { .run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) { @SuppressWarnings("unchecked") ReactivePulsarTemplate pulsarTemplate = context.getBean(ReactivePulsarTemplate.class); - pulsarTemplate.setSchema(Schema.JSON(Foo.class)); - pulsarTemplate.send("rplt-custom-topic1", new Foo("John Doe")).block(); + pulsarTemplate.send("rplt-custom-topic1", new Foo("John Doe"), Schema.JSON(Foo.class)).block(); assertThat(LATCH2.await(20, TimeUnit.SECONDS)).isTrue(); } } @@ -129,7 +128,7 @@ class ReactivePulsarListenerTests implements PulsarTestContainerSupport { @ReactivePulsarListener(subscriptionName = "rplt-sub1", topics = "rplt-topic1", consumerCustomizer = "consumerCustomizer") - public Mono listen(String foo) { + public Mono listen(String ignored) { LATCH1.countDown(); return Mono.empty(); } @@ -143,7 +142,7 @@ class ReactivePulsarListenerTests implements PulsarTestContainerSupport { @ReactivePulsarListener(subscriptionName = "rplt-custom-sub1", topics = "rplt-custom-topic1", schemaType = SchemaType.JSON, consumerCustomizer = "consumerCustomizer") - public Mono listen(Foo foo) { + public Mono listen(Foo ignored) { LATCH2.countDown(); return Mono.empty(); } @@ -164,7 +163,7 @@ class ReactivePulsarListenerTests implements PulsarTestContainerSupport { @ReactivePulsarListener(subscriptionName = "rplt-custom-sub2", topics = "rplt-custom-topic2", consumerCustomizer = "consumerCustomizer") - public Mono listen(Foo foo) { + public Mono listen(Foo ignored) { LATCH3.countDown(); return Mono.empty(); } diff --git a/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarMessageChannelBinder.java b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarMessageChannelBinder.java index ea86db06..879bc26f 100644 --- a/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarMessageChannelBinder.java +++ b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarMessageChannelBinder.java @@ -55,6 +55,7 @@ import org.springframework.pulsar.spring.cloud.stream.binder.provisioning.Pulsar * {@link Binder} implementation for Apache Pulsar. * * @author Soby Chacko + * @author Chris Bono */ public class PulsarMessageChannelBinder extends AbstractMessageChannelBinder, ExtendedProducerProperties, PulsarTopicProvisioner> @@ -80,21 +81,24 @@ public class PulsarMessageChannelBinder extends @Override protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties producerProperties, MessageChannel errorChannel) { - + final Schema schema; if (producerProperties.isUseNativeEncoding()) { - Schema schema = resolveSchema(producerProperties.getExtension().getSchemaType(), + schema = resolveSchema(producerProperties.getExtension().getSchemaType(), producerProperties.getExtension().getMessageType(), producerProperties.getExtension().getMessageKeyType(), producerProperties.getExtension().getMessageValueType()); - this.pulsarTemplate.setSchema( - Objects.requireNonNull(schema, "Could not determine producer schema for " + destination.getName())); + Objects.requireNonNull(schema, "Could not determine producer schema for " + destination.getName()); + } + else { + schema = null; } return message -> { try { - PulsarMessageChannelBinder.this.pulsarTemplate.sendAsync(destination.getName(), message.getPayload()); + PulsarMessageChannelBinder.this.pulsarTemplate.sendAsync(destination.getName(), message.getPayload(), + schema); } - catch (PulsarClientException e) { - // deal later + catch (PulsarClientException ex) { + this.logger.trace("Failed to send message to destination: " + destination.getName(), ex); } }; } @@ -109,7 +113,6 @@ public class PulsarMessageChannelBinder extends Message message = MessageBuilder.withPayload(msg.getValue()).build(); pulsarMessageDrivenChannelAdapter.send(message); }); - if (properties.isUseNativeDecoding()) { Schema schema = resolveSchema(properties.getExtension().getSchemaType(), properties.getExtension().getMessageType(), properties.getExtension().getMessageKeyType(), @@ -120,7 +123,6 @@ public class PulsarMessageChannelBinder extends else { pulsarContainerProperties.setSchema(Schema.BYTES); } - String subscriptionName = PulsarBinderUtils.subscriptionName(properties.getExtension(), destination); pulsarContainerProperties.setSubscriptionName(subscriptionName); DefaultPulsarMessageListenerContainer container = new DefaultPulsarMessageListenerContainer<>( diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java index 70c4426e..fa4247fe 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarOperations.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.springframework.lang.Nullable; @@ -41,6 +42,16 @@ public interface PulsarOperations { */ MessageId send(T message) throws PulsarClientException; + /** + * Sends a message to the default topic in a blocking manner. + * @param message the message to send + * @param schema the schema to use or {@code null} to send using the default schema + * resolution + * @return the id assigned by the broker to the published message + * @throws PulsarClientException if an error occurs + */ + MessageId send(T message, @Nullable Schema schema) throws PulsarClientException; + /** * Sends a message to the specified topic in a blocking manner. * @param topic the topic to send the message to or {@code null} to send to the @@ -51,6 +62,18 @@ public interface PulsarOperations { */ MessageId send(@Nullable String topic, T message) throws PulsarClientException; + /** + * Sends a message to the specified topic in a blocking manner. + * @param topic the topic to send the message to or {@code null} to send to the + * default topic + * @param message the message to send + * @param schema the schema to use or {@code null} to send using the default schema + * resolution + * @return the id assigned by the broker to the published message + * @throws PulsarClientException if an error occurs + */ + MessageId send(@Nullable String topic, T message, @Nullable Schema schema) throws PulsarClientException; + /** * Sends a message to the default topic in a non-blocking manner. * @param message the message to send @@ -59,6 +82,16 @@ public interface PulsarOperations { */ CompletableFuture sendAsync(T message) throws PulsarClientException; + /** + * Sends a message to the default topic in a non-blocking manner. + * @param message the message to send + * @param schema the schema to use or {@code null} to send using the default schema + * resolution + * @return a future that holds the id assigned by the broker to the published message + * @throws PulsarClientException if an error occurs + */ + CompletableFuture sendAsync(T message, @Nullable Schema schema) throws PulsarClientException; + /** * Sends a message to the specified topic in a non-blocking manner. * @param topic the topic to send the message to or {@code null} to send to the @@ -69,6 +102,19 @@ public interface PulsarOperations { */ CompletableFuture sendAsync(@Nullable String topic, T message) throws PulsarClientException; + /** + * Sends a message to the specified topic in a non-blocking manner. + * @param topic the topic to send the message to or {@code null} to send to the + * default topic + * @param message the message to send + * @param schema the schema to use or {@code null} to send using the default schema + * resolution + * @return a future that holds the id assigned by the broker to the published message + * @throws PulsarClientException if an error occurs + */ + CompletableFuture sendAsync(@Nullable String topic, T message, @Nullable Schema schema) + throws PulsarClientException; + /** * Create a {@link SendMessageBuilder builder} for configuring and sending a message. * @param message the payload of the message @@ -91,6 +137,13 @@ public interface PulsarOperations { */ SendMessageBuilder withTopic(String topic); + /** + * Specify the schema to use when sending the message. + * @param schema the schema to use + * @return the current builder with the schema specified + */ + SendMessageBuilder withSchema(Schema schema); + /** * Specify the encryption keys to use. * @param encryptionKeys the encryption keys diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java index 645c349c..7a7fcad4 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.MessageId; @@ -42,7 +43,7 @@ import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationRegistry; /** - * A thread-safe template for executing high-level Pulsar operations. + * A template for executing high-level Pulsar operations. * * @param the message payload type * @author Soby Chacko @@ -56,7 +57,6 @@ public class PulsarTemplate implements PulsarOperations, BeanNameAware { private final PulsarProducerFactory producerFactory; - @Nullable private final List interceptors; private final SchemaResolver schemaResolver; @@ -69,9 +69,6 @@ public class PulsarTemplate implements PulsarOperations, BeanNameAware { private String beanName = ""; - @Nullable - private Schema schema; - /** * Construct a template instance without interceptors that uses the default schema * resolver. @@ -113,22 +110,43 @@ public class PulsarTemplate implements PulsarOperations, BeanNameAware { @Override public MessageId send(T message) throws PulsarClientException { - return doSend(null, null, message, null, null); + return doSend(null, message, null, null, null, null); + } + + @Override + public MessageId send(T message, @Nullable Schema schema) throws PulsarClientException { + return doSend(null, message, schema, null, null, null); } @Override public MessageId send(@Nullable String topic, T message) throws PulsarClientException { - return doSend(topic, null, message, null, null); + return doSend(topic, message, null, null, null, null); + } + + @Override + public MessageId send(@Nullable String topic, T message, @Nullable Schema schema) throws PulsarClientException { + return doSend(topic, message, schema, null, null, null); } @Override public CompletableFuture sendAsync(T message) throws PulsarClientException { - return doSendAsync(null, null, message, null, null); + return doSendAsync(null, message, null, null, null, null); + } + + @Override + public CompletableFuture sendAsync(T message, @Nullable Schema schema) throws PulsarClientException { + return doSendAsync(null, message, schema, null, null, null); } @Override public CompletableFuture sendAsync(@Nullable String topic, T message) throws PulsarClientException { - return doSendAsync(topic, null, message, null, null); + return doSendAsync(topic, message, null, null, null, null); + } + + @Override + public CompletableFuture sendAsync(@Nullable String topic, T message, @Nullable Schema schema) + throws PulsarClientException { + return doSendAsync(topic, message, schema, null, null, null); } @Override @@ -141,27 +159,21 @@ public class PulsarTemplate implements PulsarOperations, BeanNameAware { this.beanName = beanName; } - /** - * Set the schema to use on this template. - * @param schema provides the {@link Schema} used on this template - */ - public void setSchema(Schema schema) { - this.schema = schema; - } - - private MessageId doSend(@Nullable String topic, @Nullable Collection encryptionKeys, T message, + private MessageId doSend(@Nullable String topic, T message, @Nullable Schema schema, + @Nullable Collection encryptionKeys, @Nullable TypedMessageBuilderCustomizer typedMessageBuilderCustomizer, @Nullable ProducerBuilderCustomizer producerCustomizer) throws PulsarClientException { try { - return doSendAsync(topic, encryptionKeys, message, typedMessageBuilderCustomizer, producerCustomizer).get(); + return doSendAsync(topic, message, schema, encryptionKeys, typedMessageBuilderCustomizer, + producerCustomizer).get(); } catch (Exception ex) { throw PulsarClientException.unwrap(ex); } } - private CompletableFuture doSendAsync(@Nullable String topic, - @Nullable Collection encryptionKeys, T message, + private CompletableFuture doSendAsync(@Nullable String topic, T message, @Nullable Schema schema, + @Nullable Collection encryptionKeys, @Nullable TypedMessageBuilderCustomizer typedMessageBuilderCustomizer, @Nullable ProducerBuilderCustomizer producerCustomizer) throws PulsarClientException { String topicName = ProducerUtils.resolveTopicName(topic, this.producerFactory); @@ -171,7 +183,7 @@ public class PulsarTemplate implements PulsarOperations, BeanNameAware { Observation observation = newObservation(senderContext); try { observation.start(); - Producer producer = prepareProducerForSend(topic, message, encryptionKeys, producerCustomizer); + Producer producer = prepareProducerForSend(topic, message, schema, encryptionKeys, producerCustomizer); TypedMessageBuilder messageBuilder = producer.newMessage().value(message); if (typedMessageBuilderCustomizer != null) { typedMessageBuilderCustomizer.customize(messageBuilder); @@ -207,10 +219,13 @@ public class PulsarTemplate implements PulsarOperations, BeanNameAware { DefaultPulsarTemplateObservationConvention.INSTANCE, () -> senderContext, this.observationRegistry); } - private Producer prepareProducerForSend(@Nullable String topic, T message, + private Producer prepareProducerForSend(@Nullable String topic, T message, @Nullable Schema schema, @Nullable Collection encryptionKeys, @Nullable ProducerBuilderCustomizer producerCustomizer) throws PulsarClientException { - Schema schema = this.schema != null ? this.schema : this.schemaResolver.getSchema(message); + if (schema == null) { + schema = Objects.requireNonNull(this.schemaResolver.getSchema(message), + "Schema must not be null - expecting at least a default schema"); + } List> customizers = new ArrayList<>(); if (!CollectionUtils.isEmpty(this.interceptors)) { customizers.add(builder -> this.interceptors.forEach(builder::intercept)); @@ -230,6 +245,9 @@ public class PulsarTemplate implements PulsarOperations, BeanNameAware { @Nullable private String topic; + @Nullable + private Schema schema; + @Nullable private Collection encryptionKeys; @@ -250,6 +268,12 @@ public class PulsarTemplate implements PulsarOperations, BeanNameAware { return this; } + @Override + public SendMessageBuilder withSchema(Schema schema) { + this.schema = schema; + return this; + } + @Override public SendMessageBuilder withEncryptionKeys(Collection encryptionKeys) { this.encryptionKeys = encryptionKeys; @@ -270,14 +294,14 @@ public class PulsarTemplate implements PulsarOperations, BeanNameAware { @Override public MessageId send() throws PulsarClientException { - return this.template.doSend(this.topic, this.encryptionKeys, this.message, this.messageCustomizer, - this.producerCustomizer); + return this.template.doSend(this.topic, this.message, this.schema, this.encryptionKeys, + this.messageCustomizer, this.producerCustomizer); } @Override public CompletableFuture sendAsync() throws PulsarClientException { - return this.template.doSendAsync(this.topic, this.encryptionKeys, this.message, this.messageCustomizer, - this.producerCustomizer); + return this.template.doSendAsync(this.topic, this.message, this.schema, this.encryptionKeys, + this.messageCustomizer, this.producerCustomizer); } } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java index 11b697ea..92806ef3 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java @@ -61,6 +61,117 @@ import org.springframework.pulsar.test.support.PulsarTestContainerSupport; */ class PulsarTemplateTests implements PulsarTestContainerSupport { + @ParameterizedTest(name = "{0}") + @MethodSource("sendMessageTestProvider") + void sendMessageTest(String testName, SendTestArgs testArgs) throws Exception { + // Use the test args to construct the params to pass to send handler + String topic = testName; + String subscription = topic + "-sub"; + String msgPayload = topic + "-msg"; + TypedMessageBuilderCustomizer messageCustomizer = null; + if (testArgs.messageCustomizer) { + messageCustomizer = (mb) -> mb.key("foo-key"); + } + ProducerBuilderCustomizer producerCustomizer = null; + if (testArgs.producerCustomizer) { + producerCustomizer = (pb) -> pb.producerName("foo-producer"); + } + try (PulsarClient client = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) + .build()) { + try (Consumer consumer = client.newConsumer(Schema.STRING).topic(topic) + .subscriptionName(subscription).subscribe()) { + Map producerConfig = testArgs.explicitTopic ? Collections.emptyMap() + : Collections.singletonMap("topicName", topic); + PulsarProducerFactory producerFactory = new DefaultPulsarProducerFactory<>(client, + producerConfig); + PulsarTemplate pulsarTemplate = new PulsarTemplate<>(producerFactory); + + Object sendResponse; + if (testArgs.simpleApi) { + if (testArgs.explicitSchema && testArgs.explicitTopic) { + sendResponse = testArgs.async ? pulsarTemplate.sendAsync(topic, msgPayload, Schema.STRING) + : pulsarTemplate.send(topic, msgPayload, Schema.STRING); + } + else if (testArgs.explicitSchema) { + sendResponse = testArgs.async ? pulsarTemplate.sendAsync(msgPayload, Schema.STRING) + : pulsarTemplate.send(msgPayload, Schema.STRING); + } + else if (testArgs.explicitTopic) { + sendResponse = testArgs.async ? pulsarTemplate.sendAsync(topic, msgPayload) + : pulsarTemplate.send(topic, msgPayload); + } + else { + sendResponse = testArgs.async ? pulsarTemplate.sendAsync(msgPayload) + : pulsarTemplate.send(msgPayload); + } + } + else { + SendMessageBuilder messageBuilder = pulsarTemplate.newMessage(msgPayload); + if (testArgs.explicitTopic) { + messageBuilder = messageBuilder.withTopic(topic); + } + if (testArgs.explicitSchema) { + messageBuilder = messageBuilder.withSchema(Schema.STRING); + } + if (messageCustomizer != null) { + messageBuilder = messageBuilder.withMessageCustomizer(messageCustomizer); + } + if (producerCustomizer != null) { + messageBuilder = messageBuilder.withProducerCustomizer(producerCustomizer); + } + sendResponse = testArgs.async ? messageBuilder.sendAsync() : messageBuilder.send(); + } + + if (sendResponse instanceof CompletableFuture) { + sendResponse = ((CompletableFuture) sendResponse).get(3, TimeUnit.SECONDS); + } + assertThat(sendResponse).isNotNull(); + + CompletableFuture> receiveMsgFuture = consumer.receiveAsync(); + Message msg = receiveMsgFuture.get(3, TimeUnit.SECONDS); + + assertThat(msg.getData()).asString().isEqualTo(msgPayload); + if (messageCustomizer != null) { + assertThat(msg.getKey()).isEqualTo("foo-key"); + } + if (producerCustomizer != null) { + assertThat(msg.getProducerName()).isEqualTo("foo-producer"); + } + // Make sure the producer was closed by the template (albeit indirectly as + // client removes closed producers) + await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> assertThat(client).extracting("producers") + .asInstanceOf(InstanceOfAssertFactories.COLLECTION).isEmpty()); + } + } + } + + private static Stream sendMessageTestProvider() { + return Stream.of(arguments("simpleSend", SendTestArgs.simple().sync()), + arguments("simpleSendWithTopic", SendTestArgs.simple().sync().topic()), + arguments("simpleSendWithSchema", SendTestArgs.simple().sync().schema()), + arguments("simpleSendWithTopicAndSchema", SendTestArgs.simple().sync().topic().schema()), + arguments("simpleAsyncSend", SendTestArgs.simple().async()), + arguments("simpleAsyncSendWithTopic", SendTestArgs.simple().async().topic()), + arguments("simpleAsyncSendWithSchema", SendTestArgs.simple().async().schema()), + arguments("simpleAsyncSendWithTopicAndSchema", SendTestArgs.simple().async().topic().schema()), + arguments("fluentSend", SendTestArgs.fluent().sync()), + arguments("fluentSendWithSchema", SendTestArgs.fluent().sync().schema()), + arguments("fluentSendWithTopic", SendTestArgs.fluent().sync().topic()), + arguments("fluentSendWithMessageCustomizer", SendTestArgs.fluent().sync().messageCustomizer()), + arguments("fluentSendWithProducerCustomizer", SendTestArgs.fluent().sync().producerCustomizer()), + arguments("fluentSendWithTopicAndSchema", SendTestArgs.fluent().sync().topic().schema()), + arguments("fluentSendWithTopicAndSchemaAndCustomizers", + SendTestArgs.fluent().sync().topic().schema().messageCustomizer().producerCustomizer()), + arguments("fluentAsyncSend", SendTestArgs.fluent().async()), + arguments("fluentAsyncSendWithSchema", SendTestArgs.fluent().async().schema()), + arguments("fluentAsyncSendWithTopic", SendTestArgs.fluent().async().topic()), + arguments("fluentAsyncSendWithMessageCustomizer", SendTestArgs.fluent().async().messageCustomizer()), + arguments("fluentAsyncSendWithProducerCustomizer", SendTestArgs.fluent().async().producerCustomizer()), + arguments("fluentAsyncSendWithTopicAndSchema", SendTestArgs.fluent().async().topic().schema()), + arguments("fluentAsyncSendWithTopicAndSchemaAndCustomizers", + SendTestArgs.fluent().async().topic().schema().messageCustomizer().producerCustomizer())); + } + @ParameterizedTest(name = "{0}") @MethodSource("interceptorInvocationTestProvider") void interceptorInvocationTest(String topic, List interceptors) throws Exception { @@ -78,9 +189,9 @@ class PulsarTemplateTests implements PulsarTestContainerSupport { private static Stream interceptorInvocationTestProvider() { return Stream.of( - arguments(Named.of("testSingleInterceptor", "iit-topic-1"), + arguments(Named.of("singleInterceptor", "iit-topic-1"), Collections.singletonList(mock(ProducerInterceptor.class))), - arguments(Named.of("testMultipleInterceptors", "iit-topic-2"), + arguments(Named.of("multipleInterceptors", "iit-topic-2"), List.of(mock(ProducerInterceptor.class), mock(ProducerInterceptor.class)))); } @@ -94,9 +205,8 @@ class PulsarTemplateTests implements PulsarTestContainerSupport { PulsarProducerFactory producerFactory = new DefaultPulsarProducerFactory<>(client, Collections.singletonMap("topicName", topic)); PulsarTemplate pulsarTemplate = new PulsarTemplate<>(producerFactory); - pulsarTemplate.setSchema(Schema.JSON(Foo.class)); Foo foo = new Foo("Foo-" + UUID.randomUUID(), "Bar-" + UUID.randomUUID()); - pulsarTemplate.send(foo); + pulsarTemplate.send(foo, Schema.JSON(Foo.class)); assertThat(consumer.receiveAsync()).succeedsWithin(Duration.ofSeconds(3)).extracting(Message::getValue) .isEqualTo(foo); } @@ -104,7 +214,7 @@ class PulsarTemplateTests implements PulsarTestContainerSupport { } @Test - void sendMessageWithSpecificSchemaAndCustomTypeMappings() throws Exception { + void sendMessageWithSpecificSchemaInferredByCustomTypeMappings() throws Exception { String topic = "smt-specific-schema-custom-topic"; try (PulsarClient client = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) .build()) { @@ -112,7 +222,7 @@ class PulsarTemplateTests implements PulsarTestContainerSupport { .subscriptionName("smt-specific-schema-custom-subscription").subscribe()) { PulsarProducerFactory producerFactory = new DefaultPulsarProducerFactory<>(client, Collections.singletonMap("topicName", topic)); - // Custom schema resolver allows not calling setSchema on template + // Custom schema resolver allows not specifying the schema when sending DefaultSchemaResolver schemaResolver = new DefaultSchemaResolver(); schemaResolver.addCustomSchemaMapping(Foo.class, Schema.JSON(Foo.class)); PulsarTemplate pulsarTemplate = new PulsarTemplate<>(producerFactory, Collections.emptyList(), @@ -141,156 +251,59 @@ class PulsarTemplateTests implements PulsarTestContainerSupport { } } - @ParameterizedTest(name = "{0}") - @MethodSource("sendMessageTestProvider") - void sendMessageTest(String testName, SendTestArgs testArgs) throws Exception { - // Use the test args to construct the params to pass to send handler - String topic = testName; - String subscription = topic + "-sub"; - String msgPayload = topic + "-msg"; - TypedMessageBuilderCustomizer messageCustomizer = null; - if (testArgs.useMessageCustomizer) { - messageCustomizer = (mb) -> mb.key("foo-key"); - } - ProducerBuilderCustomizer producerCustomizer = null; - if (testArgs.useProducerCustomizer) { - producerCustomizer = (pb) -> pb.producerName("foo-producer"); - } - - try (PulsarClient client = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) - .build()) { - try (Consumer consumer = client.newConsumer(Schema.STRING).topic(topic) - .subscriptionName(subscription).subscribe()) { - Map producerConfig = testArgs.useSpecificTopic ? Collections.emptyMap() - : Collections.singletonMap("topicName", topic); - PulsarProducerFactory producerFactory = new DefaultPulsarProducerFactory<>(client, - producerConfig); - PulsarTemplate pulsarTemplate = new PulsarTemplate<>(producerFactory); - Object sendResponse; - if (testArgs.useSimpleApi) { - if (testArgs.useAsyncSend) { - sendResponse = testArgs.useSpecificTopic ? pulsarTemplate.sendAsync(topic, msgPayload) - : pulsarTemplate.sendAsync(msgPayload); - } - else { - sendResponse = testArgs.useSpecificTopic ? pulsarTemplate.send(topic, msgPayload) - : pulsarTemplate.send(msgPayload); - } - } - else { - SendMessageBuilder messageBuilder = pulsarTemplate.newMessage(msgPayload); - if (testArgs.useSpecificTopic) { - messageBuilder = messageBuilder.withTopic(topic); - } - if (messageCustomizer != null) { - messageBuilder = messageBuilder.withMessageCustomizer(messageCustomizer); - } - if (producerCustomizer != null) { - messageBuilder = messageBuilder.withProducerCustomizer(producerCustomizer); - } - sendResponse = testArgs.useAsyncSend ? messageBuilder.sendAsync() : messageBuilder.send(); - } - - if (sendResponse instanceof CompletableFuture) { - sendResponse = ((CompletableFuture) sendResponse).get(3, TimeUnit.SECONDS); - } - assertThat(sendResponse).isNotNull(); - - CompletableFuture> receiveMsgFuture = consumer.receiveAsync(); - Message msg = receiveMsgFuture.get(3, TimeUnit.SECONDS); - - assertThat(msg.getData()).asString().isEqualTo(msgPayload); - if (messageCustomizer != null) { - assertThat(msg.getKey()).isEqualTo("foo-key"); - } - if (producerCustomizer != null) { - assertThat(msg.getProducerName()).isEqualTo("foo-producer"); - } - // Make sure the producer was closed by the template (albeit indirectly as - // client removes closed producers) - await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> assertThat(client).extracting("producers") - .asInstanceOf(InstanceOfAssertFactories.COLLECTION).isEmpty()); - } - } - } - - private static Stream sendMessageTestProvider() { - return Stream.of(arguments("sendMessageToDefaultTopic", SendTestArgs.useSpecificTopic(false)), - arguments("sendMessageToDefaultTopicWithSimpleApi", - SendTestArgs.useSpecificTopic(false).useSimpleApi()), - arguments("sendMessageToDefaultTopicWithMessageCustomizer", - SendTestArgs.useSpecificTopic(false).useMessageCustomizer()), - arguments("sendMessageToDefaultTopicWithProducerCustomizer", - SendTestArgs.useSpecificTopic(false).useProducerCustomizer()), - arguments("sendMessageToDefaultTopicWithAllOptions", - SendTestArgs.useSpecificTopic(false).useMessageCustomizer().useProducerCustomizer()), - arguments("sendMessageToSpecificTopic", SendTestArgs.useSpecificTopic(true)), - arguments("sendMessageToSpecificTopicWithSimpleApi", - SendTestArgs.useSpecificTopic(true).useSimpleApi()), - arguments("sendMessageToSpecificTopicWithMessageCustomizer", - SendTestArgs.useSpecificTopic(true).useMessageCustomizer()), - arguments("sendMessageToSpecificTopicWithProducerCustomizer", - SendTestArgs.useSpecificTopic(true).useProducerCustomizer()), - arguments("sendMessageToSpecificTopicWithAllOptions", - SendTestArgs.useSpecificTopic(true).useMessageCustomizer().useProducerCustomizer()), - arguments("sendAsyncMessageToDefaultTopic", SendTestArgs.useSpecificTopic(false).useAsyncSend()), - arguments("sendAsyncMessageToDefaultTopicWithSimpleApi", - SendTestArgs.useSpecificTopic(false).useAsyncSend().useSimpleApi()), - arguments("sendAsyncMessageToDefaultTopicWithMessageCustomizer", - SendTestArgs.useSpecificTopic(false).useMessageCustomizer().useAsyncSend()), - arguments("sendAsyncMessageToDefaultTopicWithProducerCustomizer", - SendTestArgs.useSpecificTopic(false).useProducerCustomizer().useAsyncSend()), - arguments("sendAsyncMessageToDefaultTopicWithAllOptions", - SendTestArgs.useSpecificTopic(false).useMessageCustomizer().useProducerCustomizer() - .useAsyncSend()), - arguments("sendAsyncMessageToSpecificTopic", SendTestArgs.useSpecificTopic(true).useAsyncSend()), - arguments("sendAsyncMessageToSpecificTopicWithSimpleApi", - SendTestArgs.useSpecificTopic(true).useAsyncSend().useSimpleApi()), - arguments("sendAsyncMessageToSpecificTopicWithMessageCustomizer", - SendTestArgs.useSpecificTopic(true).useMessageCustomizer().useAsyncSend()), - arguments("sendAsyncMessageToSpecificTopicWithProducerCustomizer", - SendTestArgs.useSpecificTopic(true).useProducerCustomizer().useAsyncSend()), - arguments("sendAsyncMessageToSpecificTopicWithAllOptions", SendTestArgs.useSpecificTopic(true) - .useMessageCustomizer().useProducerCustomizer().useAsyncSend())); - } - static final class SendTestArgs { - private final boolean useSpecificTopic; + private final boolean simpleApi; - private boolean useMessageCustomizer; + private boolean async; - private boolean useProducerCustomizer; + private boolean explicitTopic; - private boolean useAsyncSend; + private boolean explicitSchema; - private boolean useSimpleApi; + private boolean messageCustomizer; - private SendTestArgs(boolean useSpecificTopic) { - this.useSpecificTopic = useSpecificTopic; + private boolean producerCustomizer; + + private SendTestArgs(boolean simpleApi) { + this.simpleApi = simpleApi; } - static SendTestArgs useSpecificTopic(boolean useSpecificTopic) { - return new SendTestArgs(useSpecificTopic); + static SendTestArgs simple() { + return new SendTestArgs(true); } - SendTestArgs useMessageCustomizer() { - this.useMessageCustomizer = true; + static SendTestArgs fluent() { + return new SendTestArgs(false); + } + + SendTestArgs async() { + this.async = true; return this; } - SendTestArgs useProducerCustomizer() { - this.useProducerCustomizer = true; + SendTestArgs sync() { + this.async = false; return this; } - SendTestArgs useAsyncSend() { - this.useAsyncSend = true; + SendTestArgs topic() { + this.explicitTopic = true; return this; } - SendTestArgs useSimpleApi() { - this.useSimpleApi = true; + SendTestArgs schema() { + this.explicitSchema = true; + return this; + } + + SendTestArgs messageCustomizer() { + this.messageCustomizer = true; + return this; + } + + SendTestArgs producerCustomizer() { + this.producerCustomizer = true; return this; } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java index 7360fceb..c539e202 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java @@ -81,6 +81,7 @@ import org.springframework.util.backoff.FixedBackOff; /** * @author Soby Chacko * @author Alexander Preuß + * @author Chris Bono */ @SpringJUnitConfig @DirtiesContext @@ -230,26 +231,26 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { static class TestPulsarListenersForBasicScenario { @PulsarListener(id = "foo", properties = { "subscriptionName=subscription-1", "topicNames=foo-1" }) - void listen1(String message) { + void listen1(String ignored) { latch.countDown(); } @PulsarListener(id = "bar", topics = "concurrency-on-pl", subscriptionName = "subscription-2", subscriptionType = SubscriptionType.Failover, concurrency = "3") - void listen2(String message) { + void listen2(String ignored) { latch1.countDown(); } @PulsarListener(id = "baz", topicPattern = "persistent://public/default/pattern.*", subscriptionName = "subscription-3", properties = { "patternAutoDiscoveryPeriod=5", "subscriptionInitialPosition=Earliest" }) - void listen3(String message) { + void listen3(String ignored) { latch2.countDown(); } @PulsarListener(id = "ackMode-test-id", subscriptionName = "ackModeTest-sub", topics = "ackModeTest-topic", ackMode = AckMode.RECORD) - void ackModeTestListener(String message) { + void ackModeTestListener(String ignored) { } } @@ -263,8 +264,7 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { static CountDownLatch nackRedeliveryBackoffLatch = new CountDownLatch(5); @Test - void pulsarListenerWithNackRedeliveryBackoff(@Autowired PulsarListenerEndpointRegistry registry) - throws Exception { + void pulsarListenerWithNackRedeliveryBackoff() throws Exception { pulsarTemplate.send("withNegRedeliveryBackoff-test-topic", "hello john doe"); assertThat(nackRedeliveryBackoffLatch.await(15, TimeUnit.SECONDS)).isTrue(); } @@ -298,8 +298,7 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { static CountDownLatch ackTimeoutRedeliveryBackoffLatch = new CountDownLatch(5); @Test - void pulsarListenerWithAckTimeoutRedeliveryBackoff(@Autowired PulsarListenerEndpointRegistry registry) - throws Exception { + void pulsarListenerWithAckTimeoutRedeliveryBackoff() throws Exception { pulsarTemplate.send("withAckTimeoutRedeliveryBackoff-test-topic", "hello john doe"); assertThat(ackTimeoutRedeliveryBackoffLatch.await(60, TimeUnit.SECONDS)).isTrue(); } @@ -313,7 +312,7 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { topics = "withAckTimeoutRedeliveryBackoff-test-topic", ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff", subscriptionType = SubscriptionType.Shared, properties = { "ackTimeoutMillis=1000" }) - void listen(String msg) { + void listen(String ignored) { ackTimeoutRedeliveryBackoffLatch.countDown(); throw new RuntimeException(); } @@ -399,7 +398,7 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { } @PulsarListener(id = "dlqListener", topics = "dlpt-dlq-topic") - void listenDlq(String msg) { + void listenDlq(String ignored) { dlqLatch.countDown(); } @@ -428,7 +427,7 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { @PulsarListener(id = "foobar", topics = "concurrency-on-pl", subscriptionName = "subscription-3", concurrency = "3") - void listen3(String message) { + void listen3(String ignored) { } } @@ -453,10 +452,9 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { PulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Collections.emptyMap()); PulsarTemplate template = new PulsarTemplate<>(pulsarProducerFactory); - template.setSchema(JSONSchema.of(User.class)); - + Schema schema = JSONSchema.of(User.class); for (int i = 0; i < 3; i++) { - template.send("json-topic", new User("Jason", i)); + template.send("json-topic", new User("Jason", i), schema); } assertThat(jsonLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(jsonBatchLatch.await(10, TimeUnit.SECONDS)).isTrue(); @@ -467,10 +465,9 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { PulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Collections.emptyMap()); PulsarTemplate template = new PulsarTemplate<>(pulsarProducerFactory); - template.setSchema(AvroSchema.of(User.class)); - + Schema schema = AvroSchema.of(User.class); for (int i = 0; i < 3; i++) { - template.send("avro-topic", new User("Avi", i)); + template.send("avro-topic", new User("Avi", i), schema); } assertThat(avroLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(avroBatchLatch.await(10, TimeUnit.SECONDS)).isTrue(); @@ -483,10 +480,8 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { PulsarTemplate> template = new PulsarTemplate<>(pulsarProducerFactory); Schema> kvSchema = Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE); - template.setSchema(kvSchema); - for (int i = 0; i < 3; i++) { - template.send("keyvalue-topic", new KeyValue<>("Kevin", i)); + template.send("keyvalue-topic", new KeyValue<>("Kevin", i), kvSchema); } assertThat(keyvalueLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(keyvalueBatchLatch.await(10, TimeUnit.SECONDS)).isTrue(); @@ -497,10 +492,9 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { PulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Collections.emptyMap()); PulsarTemplate template = new PulsarTemplate<>(pulsarProducerFactory); - template.setSchema(ProtobufSchema.of(Proto.Person.class)); - + Schema schema = ProtobufSchema.of(Proto.Person.class); for (int i = 0; i < 3; i++) { - template.send("protobuf-topic", Proto.Person.newBuilder().setId(i).setName("Paul").build()); + template.send("protobuf-topic", Proto.Person.newBuilder().setId(i).setName("Paul").build(), schema); } assertThat(protobufLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(protobufBatchLatch.await(10, TimeUnit.SECONDS)).isTrue(); @@ -512,7 +506,7 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { @PulsarListener(id = "jsonListener", topics = "json-topic", subscriptionName = "subscription-4", schemaType = SchemaType.JSON, properties = { "subscriptionInitialPosition=Earliest" }) - void listenJson(User message) { + void listenJson(User ignored) { jsonLatch.countDown(); } @@ -524,7 +518,7 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { @PulsarListener(id = "avroListener", topics = "avro-topic", subscriptionName = "subscription-6", schemaType = SchemaType.AVRO, properties = { "subscriptionInitialPosition=Earliest" }) - void listenAvro(User message) { + void listenAvro(User ignored) { avroLatch.countDown(); } @@ -536,7 +530,7 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { @PulsarListener(id = "keyvalueListener", topics = "keyvalue-topic", subscriptionName = "subscription-8", schemaType = SchemaType.KEY_VALUE, properties = { "subscriptionInitialPosition=Earliest" }) - void listenKeyvalue(KeyValue message) { + void listenKeyvalue(KeyValue ignored) { keyvalueLatch.countDown(); } @@ -549,7 +543,7 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { @PulsarListener(id = "protobufListener", topics = "protobuf-topic", subscriptionName = "subscription-10", schemaType = SchemaType.PROTOBUF, properties = { "subscriptionInitialPosition=Earliest" }) - void listenProtobuf(Proto.Person message) { + void listenProtobuf(Proto.Person ignored) { protobufLatch.countDown(); } @@ -635,10 +629,9 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { PulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Collections.emptyMap()); PulsarTemplate template = new PulsarTemplate<>(pulsarProducerFactory); - template.setSchema(Schema.JSON(User2.class)); - + Schema schema = Schema.JSON(User2.class); for (int i = 0; i < 3; i++) { - template.send("json-custom-mappings-topic", new User2("Jason", i)); + template.send("json-custom-mappings-topic", new User2("Jason", i), schema); } assertThat(jsonLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @@ -648,10 +641,9 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { PulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Collections.emptyMap()); PulsarTemplate template = new PulsarTemplate<>(pulsarProducerFactory); - template.setSchema(AvroSchema.of(User.class)); - + Schema schema = AvroSchema.of(User.class); for (int i = 0; i < 3; i++) { - template.send("avro-custom-mappings-topic", new User("Avi", i)); + template.send("avro-custom-mappings-topic", new User("Avi", i), schema); } assertThat(avroLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @@ -663,10 +655,9 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { PulsarTemplate> template = new PulsarTemplate<>(pulsarProducerFactory); Schema> kvSchema = Schema.KeyValue(Schema.STRING, Schema.JSON(User2.class), KeyValueEncodingType.INLINE); - template.setSchema(kvSchema); - for (int i = 0; i < 3; i++) { - template.send("keyvalue-custom-mappings-topic", new KeyValue<>("Kevin", new User2("Kevin", 5150))); + template.send("keyvalue-custom-mappings-topic", new KeyValue<>("Kevin", new User2("Kevin", 5150)), + kvSchema); } assertThat(keyvalueLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @@ -676,11 +667,10 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { PulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Collections.emptyMap()); PulsarTemplate template = new PulsarTemplate<>(pulsarProducerFactory); - template.setSchema(ProtobufSchema.of(Proto.Person.class)); - + Schema schema = ProtobufSchema.of(Proto.Person.class); for (int i = 0; i < 3; i++) { template.send("protobuf-custom-mappings-topic", - Proto.Person.newBuilder().setId(i).setName("Paul").build()); + Proto.Person.newBuilder().setId(i).setName("Paul").build(), schema); } assertThat(protobufLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @@ -710,25 +700,25 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { @PulsarListener(id = "jsonListener", topics = "json-custom-mappings-topic", subscriptionName = "subscription-4", properties = { "subscriptionInitialPosition=Earliest" }) - void listenJson(User2 message) { + void listenJson(User2 ignored) { jsonLatch.countDown(); } @PulsarListener(id = "avroListener", topics = "avro-custom-mappings-topic", subscriptionName = "subscription-6", properties = { "subscriptionInitialPosition=Earliest" }) - void listenAvro(User message) { + void listenAvro(User ignored) { avroLatch.countDown(); } @PulsarListener(id = "keyvalueListener", topics = "keyvalue-custom-mappings-topic", subscriptionName = "subscription-8", properties = { "subscriptionInitialPosition=Earliest" }) - void listenKeyvalue(KeyValue message) { + void listenKeyvalue(KeyValue ignored) { keyvalueLatch.countDown(); } @PulsarListener(id = "protobufListener", topics = "protobuf-custom-mappings-topic", subscriptionName = "subscription-10", properties = { "subscriptionInitialPosition=Earliest" }) - void listenProtobuf(Proto.Person message) { + void listenProtobuf(Proto.Person ignored) { protobufLatch.countDown(); }