diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTests.java index 13f59f74..fa6861c5 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTests.java @@ -28,7 +28,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -66,11 +65,9 @@ class DefaultPulsarMessageListenerContainerTests implements PulsarTestContainerS @Test void basicDefaultConsumer() throws Exception { - Map config = new HashMap<>(); - HashSet strings = new HashSet<>(); - strings.add("dpmlct-012"); - config.put("topicNames", strings); - config.put("subscriptionName", "dpmlct-sb-012"); + Set topics = Collections.singleton("dpmlct-012"); + Map config = Map.of("topicNames", topics, "subscriptionName", "dpmlct-sb-012"); + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) .build(); DefaultPulsarConsumerFactory pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, @@ -100,6 +97,7 @@ class DefaultPulsarMessageListenerContainerTests implements PulsarTestContainerS Set topics = Collections.singleton("containerPauseResumeWaitNotify-topic"); Map config = Map.of("topicNames", topics, "subscriptionName", "containerPauseResumeWaitNotify-sub"); + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) .build(); DefaultPulsarConsumerFactory pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, @@ -166,12 +164,10 @@ class DefaultPulsarMessageListenerContainerTests implements PulsarTestContainerS @Test void subscriptionInitialPositionEarliest() throws Exception { - Map config = new HashMap<>(); - HashSet strings = new HashSet<>(); - strings.add("dpmlct-013"); - config.put("topicNames", strings); - config.put("subscriptionName", "dpmlct-sb-013"); - config.put("subscriptionInitialPosition", SubscriptionInitialPosition.Earliest); + Set topics = Collections.singleton("dpmlct-013"); + Map config = Map.of("topicNames", topics, "subscriptionName", "dpmlct-sb-013", + "subscriptionInitialPosition", SubscriptionInitialPosition.Earliest); + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) .build(); DefaultPulsarConsumerFactory pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, @@ -201,11 +197,9 @@ class DefaultPulsarMessageListenerContainerTests implements PulsarTestContainerS @Test void subscriptionInitialPositionDefaultLatest() throws Exception { - Map config = new HashMap<>(); - HashSet strings = new HashSet<>(); - strings.add("dpmlct-014"); - config.put("topicNames", strings); - config.put("subscriptionName", "dpmlct-sb-014"); + Set topics = Collections.singleton("dpmlct-014"); + Map config = Map.of("topicNames", topics, "subscriptionName", "dpmlct-sb-014"); + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) .build(); DefaultPulsarConsumerFactory pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, @@ -238,13 +232,11 @@ class DefaultPulsarMessageListenerContainerTests implements PulsarTestContainerS @Test void negativeAckRedeliveryBackoff() throws Exception { - Map config = new HashMap<>(); - config.put("topicNames", Collections.singleton("dpmlct-015")); - config.put("subscriptionName", "dpmlct-sb-015"); - + Set topics = Collections.singleton("dpmlct-015"); RedeliveryBackoff redeliveryBackoff = MultiplierRedeliveryBackoff.builder().minDelayMs(1000) .maxDelayMs(5 * 1000).build(); - config.put("negativeAckRedeliveryBackoff", redeliveryBackoff); + Map config = Map.of("topicNames", topics, "subscriptionName", "dpmlct-sb-015", + "negativeAckRedeliveryBackoff", redeliveryBackoff); PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) .build(); @@ -288,13 +280,11 @@ class DefaultPulsarMessageListenerContainerTests implements PulsarTestContainerS @Test void deadLetterPolicyDefault() throws Exception { - Map config = new HashMap<>(); - config.put("topicNames", Collections.singleton("dpmlct-016")); - config.put("subscriptionName", "dpmlct-sb-016"); - config.put("ackTimeoutMillis", 1); + Set topics = Collections.singleton("dpmlct-016"); DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder().maxRedeliverCount(1) .deadLetterTopic("dpmlct-016-dlq-topic").build(); - config.put("deadLetterPolicy", deadLetterPolicy); + Map config = Map.of("topicNames", topics, "subscriptionName", "dpmlct-sb-016", + "ackTimeoutMillis", 1, "deadLetterPolicy", deadLetterPolicy); PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) .build(); @@ -346,13 +336,11 @@ class DefaultPulsarMessageListenerContainerTests implements PulsarTestContainerS @Test void deadLetterPolicyCustom() throws Exception { - Map config = new HashMap<>(); - config.put("topicNames", Collections.singleton("dpmlct-016")); - config.put("subscriptionName", "dpmlct-sb-016"); - config.put("ackTimeoutMillis", 1); + Set topics = Collections.singleton("dpmlct-017"); DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder().maxRedeliverCount(5).deadLetterTopic("dlq-topic") .build(); - config.put("deadLetterPolicy", deadLetterPolicy); + Map config = Map.of("topicNames", topics, "subscriptionName", "dpmlct-sb-016", + "ackTimeoutMillis", 1, "deadLetterPolicy", deadLetterPolicy); PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) .build(); @@ -388,7 +376,7 @@ class DefaultPulsarMessageListenerContainerTests implements PulsarTestContainerS pulsarConsumerFactory, pulsarContainerProperties); container.start(); - Map prodConfig = Collections.singletonMap("topicName", "dpmlct-016"); + Map prodConfig = Collections.singletonMap("topicName", "dpmlct-017"); DefaultPulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, prodConfig); PulsarTemplate pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);