diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java index e5027c14..1bf5fc87 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java @@ -17,6 +17,8 @@ package org.springframework.kafka.config; +import java.util.regex.Pattern; + import org.springframework.beans.BeanUtils; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; @@ -44,7 +46,7 @@ import org.springframework.retry.support.RetryTemplate; public abstract class AbstractKafkaListenerContainerFactory, K, V> implements KafkaListenerContainerFactory, ApplicationEventPublisherAware { - private final ContainerProperties containerProperties = new ContainerProperties("propertiesFactory"); + private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null); private ConsumerFactory consumerFactory; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 171e7c73..ae125221 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; +import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.BeanNameAware; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; @@ -113,9 +114,28 @@ public abstract class AbstractMessageListenerContainer protected AbstractMessageListenerContainer(ContainerProperties containerProperties) { Assert.notNull(containerProperties, "'containerProperties' cannot be null"); - this.containerProperties = containerProperties; - if (containerProperties.getConsumerRebalanceListener() == null) { - containerProperties.setConsumerRebalanceListener(createConsumerRebalanceListener()); + + if (containerProperties.getTopics() != null) { + this.containerProperties = new ContainerProperties(containerProperties.getTopics()); + } + else if (containerProperties.getTopicPattern() != null) { + this.containerProperties = new ContainerProperties(containerProperties.getTopicPattern()); + } + else { + this.containerProperties = new ContainerProperties(containerProperties.getTopicPartitions()); + } + + BeanUtils.copyProperties(containerProperties, this.containerProperties, + "topics", "topicPartitions", "topicPattern", "ackCount", "ackTime"); + + if (containerProperties.getAckCount() > 0) { + this.containerProperties.setAckCount(containerProperties.getAckCount()); + } + if (containerProperties.getAckTime() > 0) { + this.containerProperties.setAckTime(containerProperties.getAckTime()); + } + if (this.containerProperties.getConsumerRebalanceListener() == null) { + this.containerProperties.setConsumerRebalanceListener(createConsumerRebalanceListener()); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index dd016e74..8148a50e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -101,8 +101,7 @@ public class ConcurrentMessageListenerContainerTests { Map props = KafkaTestUtils.consumerProps("test1", "true", embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic1); - ConcurrentMessageListenerContainer container = - new ConcurrentMessageListenerContainer<>(cf, containerProps); + final CountDownLatch latch = new CountDownLatch(4); final Set listenerThreadNames = new ConcurrentSkipListSet<>(); containerProps.setMessageListener((MessageListener) message -> { @@ -110,10 +109,15 @@ public class ConcurrentMessageListenerContainerTests { listenerThreadNames.add(Thread.currentThread().getName()); latch.countDown(); }); + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); container.setConcurrency(2); container.setBeanName("testAuto"); container.start(); + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); @@ -135,8 +139,7 @@ public class ConcurrentMessageListenerContainerTests { Map props = KafkaTestUtils.consumerProps("test10", "true", embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic1); - ConcurrentMessageListenerContainer container = - new ConcurrentMessageListenerContainer<>(cf, containerProps); + final CountDownLatch latch = new CountDownLatch(4); final Set listenerThreadNames = new ConcurrentSkipListSet<>(); containerProps.setMessageListener((MessageListener) message -> { @@ -162,10 +165,14 @@ public class ConcurrentMessageListenerContainerTests { }); + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); container.setConcurrency(2); container.setBeanName("testAuto"); container.start(); + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); @@ -189,8 +196,7 @@ public class ConcurrentMessageListenerContainerTests { Map props = KafkaTestUtils.consumerProps("test2", "false", embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic2); - ConcurrentMessageListenerContainer container = - new ConcurrentMessageListenerContainer<>(cf, containerProps); + final CountDownLatch latch = new CountDownLatch(4); final Set listenerThreadNames = new ConcurrentSkipListSet<>(); containerProps.setMessageListener((MessageListener) message -> { @@ -198,10 +204,15 @@ public class ConcurrentMessageListenerContainerTests { listenerThreadNames.add(Thread.currentThread().getName()); latch.countDown(); }); + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); container.setConcurrency(2); container.setBeanName("testBatch"); container.start(); + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); @@ -247,25 +258,25 @@ public class ConcurrentMessageListenerContainerTests { }; ContainerProperties container1Props = new ContainerProperties(topic1Partition0); - ConcurrentMessageListenerContainer container1 = - new ConcurrentMessageListenerContainer<>(cf, container1Props); final CountDownLatch latch1 = new CountDownLatch(2); container1Props.setMessageListener((MessageListener) message -> { ConcurrentMessageListenerContainerTests.this.logger.info("defined part: " + message); latch1.countDown(); }); + ConcurrentMessageListenerContainer container1 = + new ConcurrentMessageListenerContainer<>(cf, container1Props); container1.setBeanName("b1"); container1.start(); TopicPartitionInitialOffset topic1Partition1 = new TopicPartitionInitialOffset(topic3, 1, 0L); ContainerProperties container2Props = new ContainerProperties(topic1Partition1); - ConcurrentMessageListenerContainer container2 = - new ConcurrentMessageListenerContainer<>(cf, container2Props); final CountDownLatch latch2 = new CountDownLatch(2); container2Props.setMessageListener((MessageListener) message -> { ConcurrentMessageListenerContainerTests.this.logger.info("defined part: " + message); latch2.countDown(); }); + ConcurrentMessageListenerContainer container2 = + new ConcurrentMessageListenerContainer<>(cf, container2Props); container2.setBeanName("b2"); container2.start(); @@ -289,14 +300,15 @@ public class ConcurrentMessageListenerContainerTests { cf = new DefaultKafkaConsumerFactory<>(props); // reset earliest ContainerProperties container3Props = new ContainerProperties(topic1Partition0, topic1Partition1); - ConcurrentMessageListenerContainer resettingContainer = - new ConcurrentMessageListenerContainer<>(cf, container3Props); - resettingContainer.setBeanName("b3"); + final CountDownLatch latch3 = new CountDownLatch(4); container3Props.setMessageListener((MessageListener) message -> { ConcurrentMessageListenerContainerTests.this.logger.info("defined part e: " + message); latch3.countDown(); }); + ConcurrentMessageListenerContainer resettingContainer = + new ConcurrentMessageListenerContainer<>(cf, container3Props); + resettingContainer.setBeanName("b3"); resettingContainer.start(); assertThat(latch3.await(60, TimeUnit.SECONDS)).isTrue(); resettingContainer.stop(); @@ -307,8 +319,7 @@ public class ConcurrentMessageListenerContainerTests { topic1Partition0 = new TopicPartitionInitialOffset(topic3, 0, -1000L); topic1Partition1 = new TopicPartitionInitialOffset(topic3, 1, -1L); ContainerProperties container4Props = new ContainerProperties(topic1Partition0, topic1Partition1); - resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container4Props); - resettingContainer.setBeanName("b4"); + final CountDownLatch latch4 = new CountDownLatch(3); final AtomicReference receivedMessage = new AtomicReference<>(); container4Props.setMessageListener((MessageListener) message -> { @@ -316,6 +327,8 @@ public class ConcurrentMessageListenerContainerTests { receivedMessage.set(message.value()); latch4.countDown(); }); + resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container4Props); + resettingContainer.setBeanName("b4"); resettingContainer.start(); assertThat(latch4.await(60, TimeUnit.SECONDS)).isTrue(); resettingContainer.stop(); @@ -330,8 +343,7 @@ public class ConcurrentMessageListenerContainerTests { topic1Partition0 = new TopicPartitionInitialOffset(topic3, 0, 1L); topic1Partition1 = new TopicPartitionInitialOffset(topic3, 1, 1L); ContainerProperties container5Props = new ContainerProperties(topic1Partition0, topic1Partition1); - resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container5Props); - resettingContainer.setBeanName("b5"); + final CountDownLatch latch5 = new CountDownLatch(4); final List messages = new ArrayList<>(); container5Props.setMessageListener((MessageListener) message -> { @@ -339,6 +351,9 @@ public class ConcurrentMessageListenerContainerTests { messages.add(message.value()); latch5.countDown(); }); + + resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container5Props); + resettingContainer.setBeanName("b5"); resettingContainer.start(); assertThat(latch5.await(60, TimeUnit.SECONDS)).isTrue(); resettingContainer.stop(); @@ -353,8 +368,7 @@ public class ConcurrentMessageListenerContainerTests { topic1Partition0 = new TopicPartitionInitialOffset(topic3, 0, 1L, true); topic1Partition1 = new TopicPartitionInitialOffset(topic3, 1, -1L, true); ContainerProperties container6Props = new ContainerProperties(topic1Partition0, topic1Partition1); - resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container6Props); - resettingContainer.setBeanName("b6"); + final CountDownLatch latch6 = new CountDownLatch(4); final List messages6 = new ArrayList<>(); container6Props.setMessageListener((MessageListener) message -> { @@ -362,6 +376,9 @@ public class ConcurrentMessageListenerContainerTests { messages6.add(message.value()); latch6.countDown(); }); + + resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container6Props); + resettingContainer.setBeanName("b6"); resettingContainer.start(); assertThat(latch6.await(60, TimeUnit.SECONDS)).isTrue(); resettingContainer.stop(); @@ -385,19 +402,22 @@ public class ConcurrentMessageListenerContainerTests { Map props = KafkaTestUtils.consumerProps("test" + ackMode, "false", embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic); - ConcurrentMessageListenerContainer container = - new ConcurrentMessageListenerContainer<>(cf, containerProps); final CountDownLatch latch = new CountDownLatch(4); containerProps.setMessageListener((AcknowledgingMessageListener) (message, ack) -> { ConcurrentMessageListenerContainerTests.this.logger.info("manual: " + message); ack.acknowledge(); latch.countDown(); }); + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); container.setConcurrency(2); containerProps.setAckMode(ackMode); container.setBeanName("test" + ackMode); container.start(); + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); @@ -429,17 +449,14 @@ public class ConcurrentMessageListenerContainerTests { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic7); - ConcurrentMessageListenerContainer container = - new ConcurrentMessageListenerContainer<>(cf, containerProps); final CountDownLatch latch = new CountDownLatch(8); containerProps.setMessageListener((AcknowledgingMessageListener) (message, ack) -> { ConcurrentMessageListenerContainerTests.this.logger.info("manualExisting: " + message); ack.acknowledge(); latch.countDown(); }); - container.setConcurrency(1); containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE); - container.setBeanName("testManualExisting"); + final CountDownLatch commits = new CountDownLatch(8); final AtomicReference exceptionRef = new AtomicReference<>(); containerProps.setCommitCallback((offsets, exception) -> { @@ -448,6 +465,12 @@ public class ConcurrentMessageListenerContainerTests { exceptionRef.compareAndSet(null, exception); } }); + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); + container.setConcurrency(1); + container.setBeanName("testManualExisting"); + container.start(); ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); template.sendDefault(0, "fooo"); @@ -479,8 +502,6 @@ public class ConcurrentMessageListenerContainerTests { DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props); ContainerProperties containerProps = new ContainerProperties(topic8); containerProps.setSyncCommits(true); - ConcurrentMessageListenerContainer container = - new ConcurrentMessageListenerContainer<>(cf, containerProps); final CountDownLatch latch = new CountDownLatch(8); final BitSet bitSet = new BitSet(8); containerProps.setMessageListener((AcknowledgingMessageListener) (message, ack) -> { @@ -489,8 +510,11 @@ public class ConcurrentMessageListenerContainerTests { bitSet.set((int) (message.partition() * 4 + message.offset())); latch.countDown(); }); - container.setConcurrency(1); containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE); + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); + container.setConcurrency(1); container.setBeanName("testManualExisting"); container.start(); ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); @@ -531,9 +555,10 @@ public class ConcurrentMessageListenerContainerTests { }); ContainerProperties containerProps = new ContainerProperties(topic1PartitionS); + containerProps.setMessageListener((MessageListener) message -> { }); + ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(cf, containerProps); - containerProps.setMessageListener((MessageListener) message -> { }); container.setConcurrency(3); container.start(); List> containers = KafkaTestUtils.getPropertyValue(container, @@ -556,8 +581,6 @@ public class ConcurrentMessageListenerContainerTests { ContainerProperties containerProps2 = new ContainerProperties(topic2); BeanUtils.copyProperties(containerProps, containerProps2, "topics", "topicPartitions", "topicPattern", "ackCount", "ackTime"); - ConcurrentMessageListenerContainer container = - new ConcurrentMessageListenerContainer<>(cf, containerProps); final CountDownLatch latch = new CountDownLatch(4); final AtomicBoolean catchError = new AtomicBoolean(false); containerProps.setMessageListener((MessageListener) message -> { @@ -565,9 +588,13 @@ public class ConcurrentMessageListenerContainerTests { latch.countDown(); throw new RuntimeException("intended"); }); + containerProps.setErrorHandler((thrownException, record) -> catchError.set(true)); + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); container.setConcurrency(2); container.setBeanName("testException"); - containerProps.setErrorHandler((thrownException, record) -> catchError.set(true)); + container.start(); ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); @@ -642,10 +669,6 @@ public class ConcurrentMessageListenerContainerTests { props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "20000"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic1); - ConcurrentMessageListenerContainer container = - new ConcurrentMessageListenerContainer<>(cf, containerProps); - ConcurrentMessageListenerContainer container2 = - new ConcurrentMessageListenerContainer<>(cf, containerProps); final CountDownLatch latch = new CountDownLatch(8); final Set listenerThreadNames = Collections.synchronizedSet(new HashSet()); List receivedMessages = Collections.synchronizedList(new ArrayList<>()); @@ -661,6 +684,11 @@ public class ConcurrentMessageListenerContainerTests { listenerThreadNames.add(Thread.currentThread().getName()); latch.countDown(); }); + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); + ConcurrentMessageListenerContainer container2 = + new ConcurrentMessageListenerContainer<>(cf, containerProps); container.setConcurrency(1); container2.setConcurrency(1); container.setBeanName("testAuto"); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 66f2364a..8c009e84 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -55,7 +55,6 @@ import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; import org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter; import org.springframework.kafka.listener.config.ContainerProperties; -import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.test.rule.KafkaEmbedded; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.retry.backoff.FixedBackOffPolicy; @@ -101,28 +100,25 @@ public class KafkaMessageListenerContainerTests { // props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 6); // 2 per poll DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props); ContainerProperties containerProps = new ContainerProperties(topic1); - KafkaMessageListenerContainer container = - new KafkaMessageListenerContainer<>(cf, containerProps); + final CountDownLatch latch = new CountDownLatch(6); final BitSet bitSet = new BitSet(6); - containerProps.setMessageListener(new MessageListener() { - - @Override - public void onMessage(ConsumerRecord message) { - logger.info("slow1: " + message); - bitSet.set((int) (message.partition() * 3 + message.offset())); - try { - Thread.sleep(1000); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - latch.countDown(); + containerProps.setMessageListener((MessageListener) message -> { + logger.info("slow1: " + message); + bitSet.set((int) (message.partition() * 3 + message.offset())); + try { + Thread.sleep(1000); } - + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + latch.countDown(); }); - container.setBeanName("testSlow1"); containerProps.setPauseAfter(100); + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); + container.setBeanName("testSlow1"); + container.start(); Consumer consumer = spyOnConsumer(container); @@ -162,32 +158,29 @@ public class KafkaMessageListenerContainerTests { DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props); ContainerProperties containerProps = new ContainerProperties(topic); containerProps.setSyncCommits(true); - KafkaMessageListenerContainer container = - new KafkaMessageListenerContainer<>(cf, containerProps); + final CountDownLatch latch = new CountDownLatch(6); final BitSet bitSet = new BitSet(4); - containerProps.setMessageListener(new AcknowledgingMessageListener() { - - @Override - public void onMessage(ConsumerRecord message, Acknowledgment ack) { - logger.info("slow2: " + message); - bitSet.set((int) (message.partition() * 3 + message.offset())); - try { - Thread.sleep(1000); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - ack.acknowledge(); - latch.countDown(); + containerProps.setMessageListener((AcknowledgingMessageListener) (message, ack) -> { + logger.info("slow2: " + message); + bitSet.set((int) (message.partition() * 3 + message.offset())); + try { + Thread.sleep(1000); } - + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + ack.acknowledge(); + latch.countDown(); }); - container.setBeanName("testSlow2"); containerProps.setPauseAfter(100); containerProps.setAckMode(ackMode); + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); + container.setBeanName("testSlow2"); container.start(); + Consumer consumer = spyOnConsumer(container); ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); @@ -221,8 +214,6 @@ public class KafkaMessageListenerContainerTests { containerProps.setPauseAfter(100); containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE); containerProps.setSyncCommits(true); - KafkaMessageListenerContainer container = - new KafkaMessageListenerContainer<>(cf, containerProps); containerProps.setMessageListener((AcknowledgingMessageListener) (message, ack) -> { logger.info("slow: " + message); @@ -234,6 +225,10 @@ public class KafkaMessageListenerContainerTests { } ack.acknowledge(); }); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); + container.setBeanName("testSlow"); container.start(); @@ -281,14 +276,15 @@ public class KafkaMessageListenerContainerTests { containerProps.setAckTime(20000); containerProps.setAckCount(20000); containerProps.setAckMode(AckMode.COUNT_TIME); - KafkaMessageListenerContainer container = - new KafkaMessageListenerContainer<>(cf, containerProps); + final CountDownLatch latch = new CountDownLatch(4); containerProps.setMessageListener((AcknowledgingMessageListener) (message, ack) -> { logger.info("slow: " + message); ack.acknowledge(); latch.countDown(); }); + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); container.setBeanName("testManualFlushed"); container.start(); @@ -322,8 +318,7 @@ public class KafkaMessageListenerContainerTests { Map props = KafkaTestUtils.consumerProps("slow3", "false", embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props); ContainerProperties containerProps = new ContainerProperties(topic3); - KafkaMessageListenerContainer container = - new KafkaMessageListenerContainer<>(cf, containerProps); + final CountDownLatch latch = new CountDownLatch(18); final BitSet bitSet = new BitSet(6); final Map faults = new HashMap<>(); @@ -350,6 +345,9 @@ public class KafkaMessageListenerContainerTests { }, buildRetry(), null); containerProps.setMessageListener(adapter); containerProps.setPauseAfter(100); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); container.setBeanName("testSlow3"); container.start(); @@ -383,8 +381,6 @@ public class KafkaMessageListenerContainerTests { Map props = KafkaTestUtils.consumerProps("slow4", "false", embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props); ContainerProperties containerProps = new ContainerProperties(topic4); - KafkaMessageListenerContainer container = - new KafkaMessageListenerContainer<>(cf, containerProps); final CountDownLatch latch = new CountDownLatch(18); final BitSet bitSet = new BitSet(6); final Map faults = new HashMap<>(); @@ -419,6 +415,9 @@ public class KafkaMessageListenerContainerTests { }, buildRetry(), null); containerProps.setMessageListener(adapter); containerProps.setPauseAfter(100); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); container.setBeanName("testSlow4"); container.start(); @@ -458,6 +457,7 @@ public class KafkaMessageListenerContainerTests { containerProps.setSyncCommits(true); containerProps.setAckMode(AckMode.RECORD); containerProps.setAckOnError(false); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); container.setBeanName("testRecordAcks"); @@ -523,6 +523,7 @@ public class KafkaMessageListenerContainerTests { containerProps.setAckMode(AckMode.BATCH); containerProps.setPollTimeout(10000); containerProps.setAckOnError(false); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); container.setBeanName("testBatchAcks");