From 8a3e562f43f13d8cd0ee6d7931e61ec2eec6f8c8 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 21 Jul 2016 18:43:09 -0400 Subject: [PATCH] GH-154: Copy ContainerProperties in the Container Fixes: GH-154 (https://github.com/spring-projects/spring-kafka/issues/154) Previously the `AbstractMessageListenerContainer` save external `ContainerProperties` instance leaving the room for external/internal mutation, which may lead to unexpected behavior * Create a new instance of the `ContainerProperties` in the `AbstractMessageListenerContainer` ctor based on the provided `ContainerProperties` and use `BeanUtils.copyProperties()` for convenience. * Refactoring for tests to meet a new state. Some of them indeed modified `ContainerProperties` after container instantiation. * Remove fake `"propertiesFactory"` topic from the `AbstractKafkaListenerContainerFactory.containerProperties` instance to avoid unexpected behavior. Change that to `(Pattern) null` **Cherry-pick to 1.0.x** Conflicts: spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java Resolved. --- ...AbstractKafkaListenerContainerFactory.java | 4 +- .../AbstractMessageListenerContainer.java | 26 ++++- ...ncurrentMessageListenerContainerTests.java | 95 ++++++++++++------- .../KafkaMessageListenerContainerTests.java | 89 ++++++++--------- 4 files changed, 131 insertions(+), 83 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java index e5027c14..1bf5fc87 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java @@ -17,6 +17,8 @@ package org.springframework.kafka.config; +import java.util.regex.Pattern; + import org.springframework.beans.BeanUtils; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; @@ -44,7 +46,7 @@ import org.springframework.retry.support.RetryTemplate; public abstract class AbstractKafkaListenerContainerFactory, K, V> implements KafkaListenerContainerFactory, ApplicationEventPublisherAware { - private final ContainerProperties containerProperties = new ContainerProperties("propertiesFactory"); + private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null); private ConsumerFactory consumerFactory; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 171e7c73..ae125221 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; +import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.BeanNameAware; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; @@ -113,9 +114,28 @@ public abstract class AbstractMessageListenerContainer protected AbstractMessageListenerContainer(ContainerProperties containerProperties) { Assert.notNull(containerProperties, "'containerProperties' cannot be null"); - this.containerProperties = containerProperties; - if (containerProperties.getConsumerRebalanceListener() == null) { - containerProperties.setConsumerRebalanceListener(createConsumerRebalanceListener()); + + if (containerProperties.getTopics() != null) { + this.containerProperties = new ContainerProperties(containerProperties.getTopics()); + } + else if (containerProperties.getTopicPattern() != null) { + this.containerProperties = new ContainerProperties(containerProperties.getTopicPattern()); + } + else { + this.containerProperties = new ContainerProperties(containerProperties.getTopicPartitions()); + } + + BeanUtils.copyProperties(containerProperties, this.containerProperties, + "topics", "topicPartitions", "topicPattern", "ackCount", "ackTime"); + + if (containerProperties.getAckCount() > 0) { + this.containerProperties.setAckCount(containerProperties.getAckCount()); + } + if (containerProperties.getAckTime() > 0) { + this.containerProperties.setAckTime(containerProperties.getAckTime()); + } + if (this.containerProperties.getConsumerRebalanceListener() == null) { + this.containerProperties.setConsumerRebalanceListener(createConsumerRebalanceListener()); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index 48707612..2080b936 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -101,8 +101,7 @@ public class ConcurrentMessageListenerContainerTests { Map props = KafkaTestUtils.consumerProps("test1", "true", embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic1); - ConcurrentMessageListenerContainer container = - new ConcurrentMessageListenerContainer<>(cf, containerProps); + final CountDownLatch latch = new CountDownLatch(4); final Set listenerThreadNames = new ConcurrentSkipListSet<>(); containerProps.setMessageListener((MessageListener) message -> { @@ -110,10 +109,15 @@ public class ConcurrentMessageListenerContainerTests { listenerThreadNames.add(Thread.currentThread().getName()); latch.countDown(); }); + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); container.setConcurrency(2); container.setBeanName("testAuto"); container.start(); + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); @@ -135,8 +139,7 @@ public class ConcurrentMessageListenerContainerTests { Map props = KafkaTestUtils.consumerProps("test10", "true", embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic1); - ConcurrentMessageListenerContainer container = - new ConcurrentMessageListenerContainer<>(cf, containerProps); + final CountDownLatch latch = new CountDownLatch(4); final Set listenerThreadNames = new ConcurrentSkipListSet<>(); containerProps.setMessageListener((MessageListener) message -> { @@ -162,10 +165,14 @@ public class ConcurrentMessageListenerContainerTests { }); + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); container.setConcurrency(2); container.setBeanName("testAuto"); container.start(); + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); @@ -189,8 +196,7 @@ public class ConcurrentMessageListenerContainerTests { Map props = KafkaTestUtils.consumerProps("test2", "false", embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic2); - ConcurrentMessageListenerContainer container = - new ConcurrentMessageListenerContainer<>(cf, containerProps); + final CountDownLatch latch = new CountDownLatch(4); final Set listenerThreadNames = new ConcurrentSkipListSet<>(); containerProps.setMessageListener((MessageListener) message -> { @@ -198,10 +204,15 @@ public class ConcurrentMessageListenerContainerTests { listenerThreadNames.add(Thread.currentThread().getName()); latch.countDown(); }); + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); container.setConcurrency(2); container.setBeanName("testBatch"); container.start(); + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); @@ -247,25 +258,25 @@ public class ConcurrentMessageListenerContainerTests { }; ContainerProperties container1Props = new ContainerProperties(topic1Partition0); - ConcurrentMessageListenerContainer container1 = - new ConcurrentMessageListenerContainer<>(cf, container1Props); final CountDownLatch latch1 = new CountDownLatch(2); container1Props.setMessageListener((MessageListener) message -> { ConcurrentMessageListenerContainerTests.this.logger.info("auto part: " + message); latch1.countDown(); }); + ConcurrentMessageListenerContainer container1 = + new ConcurrentMessageListenerContainer<>(cf, container1Props); container1.setBeanName("b1"); container1.start(); TopicPartitionInitialOffset topic1Partition1 = new TopicPartitionInitialOffset(topic3, 1); ContainerProperties container2Props = new ContainerProperties(topic1Partition1); - ConcurrentMessageListenerContainer container2 = - new ConcurrentMessageListenerContainer<>(cf, container2Props); final CountDownLatch latch2 = new CountDownLatch(2); container2Props.setMessageListener((MessageListener) message -> { ConcurrentMessageListenerContainerTests.this.logger.info("auto part: " + message); latch2.countDown(); }); + ConcurrentMessageListenerContainer container2 = + new ConcurrentMessageListenerContainer<>(cf, container2Props); container2.setBeanName("b2"); container2.start(); @@ -290,14 +301,15 @@ public class ConcurrentMessageListenerContainerTests { cf = new DefaultKafkaConsumerFactory<>(props); // reset earliest ContainerProperties container3Props = new ContainerProperties(topic1Partition0, topic1Partition1); - ConcurrentMessageListenerContainer resettingContainer = - new ConcurrentMessageListenerContainer<>(cf, container3Props); - resettingContainer.setBeanName("b3"); + final CountDownLatch latch3 = new CountDownLatch(4); container3Props.setMessageListener((MessageListener) message -> { ConcurrentMessageListenerContainerTests.this.logger.info("auto part e: " + message); latch3.countDown(); }); + ConcurrentMessageListenerContainer resettingContainer = + new ConcurrentMessageListenerContainer<>(cf, container3Props); + resettingContainer.setBeanName("b3"); resettingContainer.start(); assertThat(latch3.await(60, TimeUnit.SECONDS)).isTrue(); resettingContainer.stop(); @@ -309,8 +321,7 @@ public class ConcurrentMessageListenerContainerTests { topic1Partition0 = new TopicPartitionInitialOffset(topic3, 0, -1000L); topic1Partition1 = new TopicPartitionInitialOffset(topic3, 1, -1L); ContainerProperties container4Props = new ContainerProperties(topic1Partition0, topic1Partition1); - resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container4Props); - resettingContainer.setBeanName("b4"); + final CountDownLatch latch4 = new CountDownLatch(3); final AtomicReference receivedMessage = new AtomicReference<>(); container4Props.setMessageListener((MessageListener) message -> { @@ -318,6 +329,8 @@ public class ConcurrentMessageListenerContainerTests { receivedMessage.set(message.value()); latch4.countDown(); }); + resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container4Props); + resettingContainer.setBeanName("b4"); resettingContainer.start(); assertThat(latch4.await(60, TimeUnit.SECONDS)).isTrue(); resettingContainer.stop(); @@ -332,8 +345,7 @@ public class ConcurrentMessageListenerContainerTests { topic1Partition0 = new TopicPartitionInitialOffset(topic3, 0, 1L); topic1Partition1 = new TopicPartitionInitialOffset(topic3, 1, 1L); ContainerProperties container5Props = new ContainerProperties(topic1Partition0, topic1Partition1); - resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container5Props); - resettingContainer.setBeanName("b4"); + final CountDownLatch latch5 = new CountDownLatch(4); final List messages = new ArrayList<>(); container5Props.setMessageListener((MessageListener) message -> { @@ -341,11 +353,13 @@ public class ConcurrentMessageListenerContainerTests { messages.add(message.value()); latch5.countDown(); }); + + resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container5Props); + resettingContainer.setBeanName("b5"); resettingContainer.start(); assertThat(latch5.await(60, TimeUnit.SECONDS)).isTrue(); resettingContainer.stop(); assertThat(messages).contains("baz", "qux", "FOO", "BAZ"); - this.logger.info("Stop auto parts"); } @@ -363,19 +377,22 @@ public class ConcurrentMessageListenerContainerTests { Map props = KafkaTestUtils.consumerProps("test" + ackMode, "false", embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic); - ConcurrentMessageListenerContainer container = - new ConcurrentMessageListenerContainer<>(cf, containerProps); final CountDownLatch latch = new CountDownLatch(4); containerProps.setMessageListener((AcknowledgingMessageListener) (message, ack) -> { ConcurrentMessageListenerContainerTests.this.logger.info("manual: " + message); ack.acknowledge(); latch.countDown(); }); + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); container.setConcurrency(2); containerProps.setAckMode(ackMode); container.setBeanName("test" + ackMode); container.start(); + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); @@ -407,17 +424,14 @@ public class ConcurrentMessageListenerContainerTests { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic7); - ConcurrentMessageListenerContainer container = - new ConcurrentMessageListenerContainer<>(cf, containerProps); final CountDownLatch latch = new CountDownLatch(8); containerProps.setMessageListener((AcknowledgingMessageListener) (message, ack) -> { ConcurrentMessageListenerContainerTests.this.logger.info("manualExisting: " + message); ack.acknowledge(); latch.countDown(); }); - container.setConcurrency(1); containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE); - container.setBeanName("testManualExisting"); + final CountDownLatch commits = new CountDownLatch(8); final AtomicReference exceptionRef = new AtomicReference<>(); containerProps.setCommitCallback((offsets, exception) -> { @@ -426,6 +440,12 @@ public class ConcurrentMessageListenerContainerTests { exceptionRef.compareAndSet(null, exception); } }); + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); + container.setConcurrency(1); + container.setBeanName("testManualExisting"); + container.start(); ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); template.sendDefault(0, "fooo"); @@ -457,8 +477,6 @@ public class ConcurrentMessageListenerContainerTests { DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props); ContainerProperties containerProps = new ContainerProperties(topic8); containerProps.setSyncCommits(true); - ConcurrentMessageListenerContainer container = - new ConcurrentMessageListenerContainer<>(cf, containerProps); final CountDownLatch latch = new CountDownLatch(8); final BitSet bitSet = new BitSet(8); containerProps.setMessageListener((AcknowledgingMessageListener) (message, ack) -> { @@ -467,8 +485,11 @@ public class ConcurrentMessageListenerContainerTests { bitSet.set((int) (message.partition() * 4 + message.offset())); latch.countDown(); }); - container.setConcurrency(1); containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE); + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); + container.setConcurrency(1); container.setBeanName("testManualExisting"); container.start(); ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); @@ -509,9 +530,10 @@ public class ConcurrentMessageListenerContainerTests { }); ContainerProperties containerProps = new ContainerProperties(topic1PartitionS); + containerProps.setMessageListener((MessageListener) message -> { }); + ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(cf, containerProps); - containerProps.setMessageListener((MessageListener) message -> { }); container.setConcurrency(3); container.start(); List> containers = KafkaTestUtils.getPropertyValue(container, @@ -534,8 +556,6 @@ public class ConcurrentMessageListenerContainerTests { ContainerProperties containerProps2 = new ContainerProperties(topic2); BeanUtils.copyProperties(containerProps, containerProps2, "topics", "topicPartitions", "topicPattern", "ackCount", "ackTime"); - ConcurrentMessageListenerContainer container = - new ConcurrentMessageListenerContainer<>(cf, containerProps); final CountDownLatch latch = new CountDownLatch(4); final AtomicBoolean catchError = new AtomicBoolean(false); containerProps.setMessageListener((MessageListener) message -> { @@ -543,9 +563,13 @@ public class ConcurrentMessageListenerContainerTests { latch.countDown(); throw new RuntimeException("intended"); }); + containerProps.setErrorHandler((thrownException, record) -> catchError.set(true)); + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); container.setConcurrency(2); container.setBeanName("testException"); - containerProps.setErrorHandler((thrownException, record) -> catchError.set(true)); + container.start(); ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); @@ -620,10 +644,6 @@ public class ConcurrentMessageListenerContainerTests { props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "20000"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic1); - ConcurrentMessageListenerContainer container = - new ConcurrentMessageListenerContainer<>(cf, containerProps); - ConcurrentMessageListenerContainer container2 = - new ConcurrentMessageListenerContainer<>(cf, containerProps); final CountDownLatch latch = new CountDownLatch(8); final Set listenerThreadNames = Collections.synchronizedSet(new HashSet()); List receivedMessages = Collections.synchronizedList(new ArrayList<>()); @@ -639,6 +659,11 @@ public class ConcurrentMessageListenerContainerTests { listenerThreadNames.add(Thread.currentThread().getName()); latch.countDown(); }); + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); + ConcurrentMessageListenerContainer container2 = + new ConcurrentMessageListenerContainer<>(cf, containerProps); container.setConcurrency(1); container2.setConcurrency(1); container.setBeanName("testAuto"); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index d9471b44..63989ab2 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -54,7 +54,6 @@ import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; import org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter; import org.springframework.kafka.listener.config.ContainerProperties; -import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.test.rule.KafkaEmbedded; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.retry.backoff.FixedBackOffPolicy; @@ -100,28 +99,25 @@ public class KafkaMessageListenerContainerTests { // props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 6); // 2 per poll DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props); ContainerProperties containerProps = new ContainerProperties(topic1); - KafkaMessageListenerContainer container = - new KafkaMessageListenerContainer<>(cf, containerProps); + final CountDownLatch latch = new CountDownLatch(6); final BitSet bitSet = new BitSet(6); - containerProps.setMessageListener(new MessageListener() { - - @Override - public void onMessage(ConsumerRecord message) { - logger.info("slow1: " + message); - bitSet.set((int) (message.partition() * 3 + message.offset())); - try { - Thread.sleep(1000); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - latch.countDown(); + containerProps.setMessageListener((MessageListener) message -> { + logger.info("slow1: " + message); + bitSet.set((int) (message.partition() * 3 + message.offset())); + try { + Thread.sleep(1000); } - + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + latch.countDown(); }); - container.setBeanName("testSlow1"); containerProps.setPauseAfter(100); + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); + container.setBeanName("testSlow1"); + container.start(); Consumer consumer = spyOnConsumer(container); @@ -161,32 +157,29 @@ public class KafkaMessageListenerContainerTests { DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props); ContainerProperties containerProps = new ContainerProperties(topic); containerProps.setSyncCommits(true); - KafkaMessageListenerContainer container = - new KafkaMessageListenerContainer<>(cf, containerProps); + final CountDownLatch latch = new CountDownLatch(6); final BitSet bitSet = new BitSet(4); - containerProps.setMessageListener(new AcknowledgingMessageListener() { - - @Override - public void onMessage(ConsumerRecord message, Acknowledgment ack) { - logger.info("slow2: " + message); - bitSet.set((int) (message.partition() * 3 + message.offset())); - try { - Thread.sleep(1000); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - ack.acknowledge(); - latch.countDown(); + containerProps.setMessageListener((AcknowledgingMessageListener) (message, ack) -> { + logger.info("slow2: " + message); + bitSet.set((int) (message.partition() * 3 + message.offset())); + try { + Thread.sleep(1000); } - + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + ack.acknowledge(); + latch.countDown(); }); - container.setBeanName("testSlow2"); containerProps.setPauseAfter(100); containerProps.setAckMode(ackMode); + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); + container.setBeanName("testSlow2"); container.start(); + Consumer consumer = spyOnConsumer(container); ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); @@ -220,8 +213,6 @@ public class KafkaMessageListenerContainerTests { containerProps.setPauseAfter(100); containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE); containerProps.setSyncCommits(true); - KafkaMessageListenerContainer container = - new KafkaMessageListenerContainer<>(cf, containerProps); containerProps.setMessageListener((AcknowledgingMessageListener) (message, ack) -> { logger.info("slow: " + message); @@ -233,6 +224,10 @@ public class KafkaMessageListenerContainerTests { } ack.acknowledge(); }); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); + container.setBeanName("testSlow"); container.start(); @@ -280,14 +275,15 @@ public class KafkaMessageListenerContainerTests { containerProps.setAckTime(20000); containerProps.setAckCount(20000); containerProps.setAckMode(AckMode.COUNT_TIME); - KafkaMessageListenerContainer container = - new KafkaMessageListenerContainer<>(cf, containerProps); + final CountDownLatch latch = new CountDownLatch(4); containerProps.setMessageListener((AcknowledgingMessageListener) (message, ack) -> { logger.info("slow: " + message); ack.acknowledge(); latch.countDown(); }); + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); container.setBeanName("testManualFlushed"); container.start(); @@ -321,8 +317,7 @@ public class KafkaMessageListenerContainerTests { Map props = KafkaTestUtils.consumerProps("slow3", "false", embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props); ContainerProperties containerProps = new ContainerProperties(topic3); - KafkaMessageListenerContainer container = - new KafkaMessageListenerContainer<>(cf, containerProps); + final CountDownLatch latch = new CountDownLatch(18); final BitSet bitSet = new BitSet(6); final Map faults = new HashMap<>(); @@ -349,6 +344,9 @@ public class KafkaMessageListenerContainerTests { }, buildRetry(), null); containerProps.setMessageListener(adapter); containerProps.setPauseAfter(100); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); container.setBeanName("testSlow3"); container.start(); @@ -382,8 +380,6 @@ public class KafkaMessageListenerContainerTests { Map props = KafkaTestUtils.consumerProps("slow4", "false", embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props); ContainerProperties containerProps = new ContainerProperties(topic4); - KafkaMessageListenerContainer container = - new KafkaMessageListenerContainer<>(cf, containerProps); final CountDownLatch latch = new CountDownLatch(18); final BitSet bitSet = new BitSet(6); final Map faults = new HashMap<>(); @@ -418,6 +414,9 @@ public class KafkaMessageListenerContainerTests { }, buildRetry(), null); containerProps.setMessageListener(adapter); containerProps.setPauseAfter(100); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); container.setBeanName("testSlow4"); container.start(); @@ -457,6 +456,7 @@ public class KafkaMessageListenerContainerTests { containerProps.setSyncCommits(true); containerProps.setAckMode(AckMode.RECORD); containerProps.setAckOnError(false); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); container.setBeanName("testRecordAcks"); @@ -522,6 +522,7 @@ public class KafkaMessageListenerContainerTests { containerProps.setAckMode(AckMode.BATCH); containerProps.setPollTimeout(10000); containerProps.setAckOnError(false); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); container.setBeanName("testBatchAcks");