diff --git a/spring-pulsar-dependencies/build.gradle b/spring-pulsar-dependencies/build.gradle index f6b45200..e600a3b3 100644 --- a/spring-pulsar-dependencies/build.gradle +++ b/spring-pulsar-dependencies/build.gradle @@ -11,7 +11,7 @@ ext { protobufJavaVersion = '3.21.5' testcontainersVersion = '1.17.3' pulsarVersion = '2.10.2' - pulsarClientReactiveVersion = '0.1.0-SNAPSHOT' + pulsarClientReactiveVersion = '0.1.0' springBootVersion = '3.0.0' } diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainer.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainer.java index 61466b76..b033ecb9 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainer.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainer.java @@ -152,7 +152,7 @@ public non-sealed class DefaultReactivePulsarMessageListenerContainer builder.subscriptionName(containerProperties.getSubscriptionName()); } if (!CollectionUtils.isEmpty(containerProperties.getTopics())) { - builder.topicNames(new ArrayList<>(containerProperties.getTopics())); + builder.topics(new ArrayList<>(containerProperties.getTopics())); } if (containerProperties.getTopicsPattern() != null) { builder.topicsPattern(containerProperties.getTopicsPattern()); @@ -181,7 +181,7 @@ public non-sealed class DefaultReactivePulsarMessageListenerContainer .handlingTimeout(containerProperties.getHandlingTimeout()); if (containerProperties.getConcurrency() > 0) { ConcurrentOneByOneMessagePipelineBuilder concurrentPipelineBuilder = messagePipelineBuilder - .concurrent().concurrency(containerProperties.getConcurrency()); + .concurrency(containerProperties.getConcurrency()); if (containerProperties.isUseKeyOrderedProcessing()) { concurrentPipelineBuilder.useKeyOrderedProcessing(); } diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainerTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainerTests.java index 4171013c..b494d50a 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainerTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainerTests.java @@ -132,7 +132,7 @@ class DefaultReactivePulsarMessageListenerContainerTests implements PulsarTestCo pulsarConsumerFactory .createConsumer(Schema.STRING, Collections.singletonList( - c -> c.topicNames(Collections.singletonList(topic)).subscriptionName(subscriptionName))) + c -> c.topics(Collections.singletonList(topic)).subscriptionName(subscriptionName))) .consumeNothing().block(Duration.ofSeconds(10)); CountDownLatch latch = new CountDownLatch(1); ReactivePulsarContainerProperties pulsarContainerProperties = new ReactivePulsarContainerProperties<>(); @@ -244,7 +244,7 @@ class DefaultReactivePulsarMessageListenerContainerTests implements PulsarTestCo pulsarConsumerFactory .createConsumer(Schema.STRING, Collections.singletonList( - c -> c.topicNames(Collections.singletonList(topic)).subscriptionName(subscriptionName))) + c -> c.topics(Collections.singletonList(topic)).subscriptionName(subscriptionName))) .consumeNothing().block(Duration.ofSeconds(10)); CountDownLatch latch = new CountDownLatch(1); ReactivePulsarContainerProperties pulsarContainerProperties = new ReactivePulsarContainerProperties<>(); @@ -282,7 +282,7 @@ class DefaultReactivePulsarMessageListenerContainerTests implements PulsarTestCo DefaultReactivePulsarConsumerFactory pulsarConsumerFactory = new DefaultReactivePulsarConsumerFactory<>( reactivePulsarClient, config); ReactiveMessageConsumer dlqConsumer = pulsarConsumerFactory.createConsumer(Schema.STRING, - Collections.singletonList(b -> b.topicNames(Collections.singletonList(deadLetterTopic)))); + Collections.singletonList(b -> b.topics(Collections.singletonList(deadLetterTopic)))); // Ensure subscriptions are created pulsarConsumerFactory.createConsumer(Schema.STRING).consumeNothing().block(Duration.ofSeconds(10)); diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java index 1d1ca48c..25b51dda 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java @@ -209,7 +209,7 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport { @Bean ReactiveMessageConsumerBuilderCustomizer listen2Customizer() { - return b -> b.topicNames(List.of("topic-2")) + return b -> b.topics(List.of("topic-2")) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest); }