Bump pulsar-client-reactive to 0.1.0 (#251)

This commit is contained in:
Christophe Bornet
2022-12-12 19:16:29 +01:00
committed by GitHub
parent 0336228e99
commit 7fd522cd64
4 changed files with 7 additions and 7 deletions

View File

@@ -11,7 +11,7 @@ ext {
protobufJavaVersion = '3.21.5'
testcontainersVersion = '1.17.3'
pulsarVersion = '2.10.2'
pulsarClientReactiveVersion = '0.1.0-SNAPSHOT'
pulsarClientReactiveVersion = '0.1.0'
springBootVersion = '3.0.0'
}

View File

@@ -152,7 +152,7 @@ public non-sealed class DefaultReactivePulsarMessageListenerContainer<T>
builder.subscriptionName(containerProperties.getSubscriptionName());
}
if (!CollectionUtils.isEmpty(containerProperties.getTopics())) {
builder.topicNames(new ArrayList<>(containerProperties.getTopics()));
builder.topics(new ArrayList<>(containerProperties.getTopics()));
}
if (containerProperties.getTopicsPattern() != null) {
builder.topicsPattern(containerProperties.getTopicsPattern());
@@ -181,7 +181,7 @@ public non-sealed class DefaultReactivePulsarMessageListenerContainer<T>
.handlingTimeout(containerProperties.getHandlingTimeout());
if (containerProperties.getConcurrency() > 0) {
ConcurrentOneByOneMessagePipelineBuilder<T> concurrentPipelineBuilder = messagePipelineBuilder
.concurrent().concurrency(containerProperties.getConcurrency());
.concurrency(containerProperties.getConcurrency());
if (containerProperties.isUseKeyOrderedProcessing()) {
concurrentPipelineBuilder.useKeyOrderedProcessing();
}

View File

@@ -132,7 +132,7 @@ class DefaultReactivePulsarMessageListenerContainerTests implements PulsarTestCo
pulsarConsumerFactory
.createConsumer(Schema.STRING,
Collections.singletonList(
c -> c.topicNames(Collections.singletonList(topic)).subscriptionName(subscriptionName)))
c -> c.topics(Collections.singletonList(topic)).subscriptionName(subscriptionName)))
.consumeNothing().block(Duration.ofSeconds(10));
CountDownLatch latch = new CountDownLatch(1);
ReactivePulsarContainerProperties<String> pulsarContainerProperties = new ReactivePulsarContainerProperties<>();
@@ -244,7 +244,7 @@ class DefaultReactivePulsarMessageListenerContainerTests implements PulsarTestCo
pulsarConsumerFactory
.createConsumer(Schema.STRING,
Collections.singletonList(
c -> c.topicNames(Collections.singletonList(topic)).subscriptionName(subscriptionName)))
c -> c.topics(Collections.singletonList(topic)).subscriptionName(subscriptionName)))
.consumeNothing().block(Duration.ofSeconds(10));
CountDownLatch latch = new CountDownLatch(1);
ReactivePulsarContainerProperties<String> pulsarContainerProperties = new ReactivePulsarContainerProperties<>();
@@ -282,7 +282,7 @@ class DefaultReactivePulsarMessageListenerContainerTests implements PulsarTestCo
DefaultReactivePulsarConsumerFactory<String> pulsarConsumerFactory = new DefaultReactivePulsarConsumerFactory<>(
reactivePulsarClient, config);
ReactiveMessageConsumer<String> dlqConsumer = pulsarConsumerFactory.createConsumer(Schema.STRING,
Collections.singletonList(b -> b.topicNames(Collections.singletonList(deadLetterTopic))));
Collections.singletonList(b -> b.topics(Collections.singletonList(deadLetterTopic))));
// Ensure subscriptions are created
pulsarConsumerFactory.createConsumer(Schema.STRING).consumeNothing().block(Duration.ofSeconds(10));

View File

@@ -209,7 +209,7 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport {
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> listen2Customizer() {
return b -> b.topicNames(List.of("topic-2"))
return b -> b.topics(List.of("topic-2"))
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}