Test cleanup: DefaultPulsarMessageListenerContainerTests

This commit is contained in:
Soby Chacko
2023-01-31 16:58:39 -05:00
parent ce96e7f5a9
commit 051aa43521

View File

@@ -28,7 +28,6 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -66,11 +65,9 @@ class DefaultPulsarMessageListenerContainerTests implements PulsarTestContainerS
@Test
void basicDefaultConsumer() throws Exception {
Map<String, Object> config = new HashMap<>();
HashSet<String> strings = new HashSet<>();
strings.add("dpmlct-012");
config.put("topicNames", strings);
config.put("subscriptionName", "dpmlct-sb-012");
Set<String> topics = Collections.singleton("dpmlct-012");
Map<String, Object> config = Map.of("topicNames", topics, "subscriptionName", "dpmlct-sb-012");
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl())
.build();
DefaultPulsarConsumerFactory<String> pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
@@ -100,6 +97,7 @@ class DefaultPulsarMessageListenerContainerTests implements PulsarTestContainerS
Set<String> topics = Collections.singleton("containerPauseResumeWaitNotify-topic");
Map<String, Object> config = Map.of("topicNames", topics, "subscriptionName",
"containerPauseResumeWaitNotify-sub");
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl())
.build();
DefaultPulsarConsumerFactory<String> pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
@@ -166,12 +164,10 @@ class DefaultPulsarMessageListenerContainerTests implements PulsarTestContainerS
@Test
void subscriptionInitialPositionEarliest() throws Exception {
Map<String, Object> config = new HashMap<>();
HashSet<String> strings = new HashSet<>();
strings.add("dpmlct-013");
config.put("topicNames", strings);
config.put("subscriptionName", "dpmlct-sb-013");
config.put("subscriptionInitialPosition", SubscriptionInitialPosition.Earliest);
Set<String> topics = Collections.singleton("dpmlct-013");
Map<String, Object> config = Map.of("topicNames", topics, "subscriptionName", "dpmlct-sb-013",
"subscriptionInitialPosition", SubscriptionInitialPosition.Earliest);
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl())
.build();
DefaultPulsarConsumerFactory<String> pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
@@ -201,11 +197,9 @@ class DefaultPulsarMessageListenerContainerTests implements PulsarTestContainerS
@Test
void subscriptionInitialPositionDefaultLatest() throws Exception {
Map<String, Object> config = new HashMap<>();
HashSet<String> strings = new HashSet<>();
strings.add("dpmlct-014");
config.put("topicNames", strings);
config.put("subscriptionName", "dpmlct-sb-014");
Set<String> topics = Collections.singleton("dpmlct-014");
Map<String, Object> config = Map.of("topicNames", topics, "subscriptionName", "dpmlct-sb-014");
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl())
.build();
DefaultPulsarConsumerFactory<String> pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
@@ -238,13 +232,11 @@ class DefaultPulsarMessageListenerContainerTests implements PulsarTestContainerS
@Test
void negativeAckRedeliveryBackoff() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("topicNames", Collections.singleton("dpmlct-015"));
config.put("subscriptionName", "dpmlct-sb-015");
Set<String> topics = Collections.singleton("dpmlct-015");
RedeliveryBackoff redeliveryBackoff = MultiplierRedeliveryBackoff.builder().minDelayMs(1000)
.maxDelayMs(5 * 1000).build();
config.put("negativeAckRedeliveryBackoff", redeliveryBackoff);
Map<String, Object> config = Map.of("topicNames", topics, "subscriptionName", "dpmlct-sb-015",
"negativeAckRedeliveryBackoff", redeliveryBackoff);
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl())
.build();
@@ -288,13 +280,11 @@ class DefaultPulsarMessageListenerContainerTests implements PulsarTestContainerS
@Test
void deadLetterPolicyDefault() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("topicNames", Collections.singleton("dpmlct-016"));
config.put("subscriptionName", "dpmlct-sb-016");
config.put("ackTimeoutMillis", 1);
Set<String> topics = Collections.singleton("dpmlct-016");
DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder().maxRedeliverCount(1)
.deadLetterTopic("dpmlct-016-dlq-topic").build();
config.put("deadLetterPolicy", deadLetterPolicy);
Map<String, Object> config = Map.of("topicNames", topics, "subscriptionName", "dpmlct-sb-016",
"ackTimeoutMillis", 1, "deadLetterPolicy", deadLetterPolicy);
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl())
.build();
@@ -346,13 +336,11 @@ class DefaultPulsarMessageListenerContainerTests implements PulsarTestContainerS
@Test
void deadLetterPolicyCustom() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("topicNames", Collections.singleton("dpmlct-016"));
config.put("subscriptionName", "dpmlct-sb-016");
config.put("ackTimeoutMillis", 1);
Set<String> topics = Collections.singleton("dpmlct-017");
DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder().maxRedeliverCount(5).deadLetterTopic("dlq-topic")
.build();
config.put("deadLetterPolicy", deadLetterPolicy);
Map<String, Object> config = Map.of("topicNames", topics, "subscriptionName", "dpmlct-sb-016",
"ackTimeoutMillis", 1, "deadLetterPolicy", deadLetterPolicy);
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl())
.build();
@@ -388,7 +376,7 @@ class DefaultPulsarMessageListenerContainerTests implements PulsarTestContainerS
pulsarConsumerFactory, pulsarContainerProperties);
container.start();
Map<String, Object> prodConfig = Collections.singletonMap("topicName", "dpmlct-016");
Map<String, Object> prodConfig = Collections.singletonMap("topicName", "dpmlct-017");
DefaultPulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
prodConfig);
PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);