GH-154: Copy ContainerProperties in the Container

Fixes: GH-154 (https://github.com/spring-projects/spring-kafka/issues/154)

Previously the `AbstractMessageListenerContainer` save external `ContainerProperties` instance leaving the room for external/internal mutation,
which may lead to unexpected behavior

* Create a new instance of the `ContainerProperties` in the `AbstractMessageListenerContainer` ctor based on the provided `ContainerProperties`
and use `BeanUtils.copyProperties()` for convenience.
* Refactoring for tests to meet a new state. Some of them indeed modified `ContainerProperties` after container instantiation.
* Remove fake `"propertiesFactory"` topic from the `AbstractKafkaListenerContainerFactory.containerProperties` instance to avoid unexpected behavior.
Change that to `(Pattern) null`

**Cherry-pick to 1.0.x**
This commit is contained in:
Artem Bilan
2016-07-21 18:43:09 -04:00
committed by Gary Russell
parent 86c9fe5b8a
commit 06772d0829
4 changed files with 135 additions and 84 deletions

View File

@@ -17,6 +17,8 @@
package org.springframework.kafka.config;
import java.util.regex.Pattern;
import org.springframework.beans.BeanUtils;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
@@ -44,7 +46,7 @@ import org.springframework.retry.support.RetryTemplate;
public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K, V>, K, V>
implements KafkaListenerContainerFactory<C>, ApplicationEventPublisherAware {
private final ContainerProperties containerProperties = new ContainerProperties("propertiesFactory");
private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null);
private ConsumerFactory<K, V> consumerFactory;

View File

@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
@@ -113,9 +114,28 @@ public abstract class AbstractMessageListenerContainer<K, V>
protected AbstractMessageListenerContainer(ContainerProperties containerProperties) {
Assert.notNull(containerProperties, "'containerProperties' cannot be null");
this.containerProperties = containerProperties;
if (containerProperties.getConsumerRebalanceListener() == null) {
containerProperties.setConsumerRebalanceListener(createConsumerRebalanceListener());
if (containerProperties.getTopics() != null) {
this.containerProperties = new ContainerProperties(containerProperties.getTopics());
}
else if (containerProperties.getTopicPattern() != null) {
this.containerProperties = new ContainerProperties(containerProperties.getTopicPattern());
}
else {
this.containerProperties = new ContainerProperties(containerProperties.getTopicPartitions());
}
BeanUtils.copyProperties(containerProperties, this.containerProperties,
"topics", "topicPartitions", "topicPattern", "ackCount", "ackTime");
if (containerProperties.getAckCount() > 0) {
this.containerProperties.setAckCount(containerProperties.getAckCount());
}
if (containerProperties.getAckTime() > 0) {
this.containerProperties.setAckTime(containerProperties.getAckTime());
}
if (this.containerProperties.getConsumerRebalanceListener() == null) {
this.containerProperties.setConsumerRebalanceListener(createConsumerRebalanceListener());
}
}

View File

@@ -101,8 +101,7 @@ public class ConcurrentMessageListenerContainerTests {
Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "true", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic1);
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(4);
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
@@ -110,10 +109,15 @@ public class ConcurrentMessageListenerContainerTests {
listenerThreadNames.add(Thread.currentThread().getName());
latch.countDown();
});
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
container.setConcurrency(2);
container.setBeanName("testAuto");
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
@@ -135,8 +139,7 @@ public class ConcurrentMessageListenerContainerTests {
Map<String, Object> props = KafkaTestUtils.consumerProps("test10", "true", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic1);
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(4);
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
@@ -162,10 +165,14 @@ public class ConcurrentMessageListenerContainerTests {
});
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
container.setConcurrency(2);
container.setBeanName("testAuto");
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
@@ -189,8 +196,7 @@ public class ConcurrentMessageListenerContainerTests {
Map<String, Object> props = KafkaTestUtils.consumerProps("test2", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic2);
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(4);
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
@@ -198,10 +204,15 @@ public class ConcurrentMessageListenerContainerTests {
listenerThreadNames.add(Thread.currentThread().getName());
latch.countDown();
});
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
container.setConcurrency(2);
container.setBeanName("testBatch");
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
@@ -247,25 +258,25 @@ public class ConcurrentMessageListenerContainerTests {
};
ContainerProperties container1Props = new ContainerProperties(topic1Partition0);
ConcurrentMessageListenerContainer<Integer, String> container1 =
new ConcurrentMessageListenerContainer<>(cf, container1Props);
final CountDownLatch latch1 = new CountDownLatch(2);
container1Props.setMessageListener((MessageListener<Integer, String>) message -> {
ConcurrentMessageListenerContainerTests.this.logger.info("defined part: " + message);
latch1.countDown();
});
ConcurrentMessageListenerContainer<Integer, String> container1 =
new ConcurrentMessageListenerContainer<>(cf, container1Props);
container1.setBeanName("b1");
container1.start();
TopicPartitionInitialOffset topic1Partition1 = new TopicPartitionInitialOffset(topic3, 1, 0L);
ContainerProperties container2Props = new ContainerProperties(topic1Partition1);
ConcurrentMessageListenerContainer<Integer, String> container2 =
new ConcurrentMessageListenerContainer<>(cf, container2Props);
final CountDownLatch latch2 = new CountDownLatch(2);
container2Props.setMessageListener((MessageListener<Integer, String>) message -> {
ConcurrentMessageListenerContainerTests.this.logger.info("defined part: " + message);
latch2.countDown();
});
ConcurrentMessageListenerContainer<Integer, String> container2 =
new ConcurrentMessageListenerContainer<>(cf, container2Props);
container2.setBeanName("b2");
container2.start();
@@ -289,14 +300,15 @@ public class ConcurrentMessageListenerContainerTests {
cf = new DefaultKafkaConsumerFactory<>(props);
// reset earliest
ContainerProperties container3Props = new ContainerProperties(topic1Partition0, topic1Partition1);
ConcurrentMessageListenerContainer<Integer, String> resettingContainer =
new ConcurrentMessageListenerContainer<>(cf, container3Props);
resettingContainer.setBeanName("b3");
final CountDownLatch latch3 = new CountDownLatch(4);
container3Props.setMessageListener((MessageListener<Integer, String>) message -> {
ConcurrentMessageListenerContainerTests.this.logger.info("defined part e: " + message);
latch3.countDown();
});
ConcurrentMessageListenerContainer<Integer, String> resettingContainer =
new ConcurrentMessageListenerContainer<>(cf, container3Props);
resettingContainer.setBeanName("b3");
resettingContainer.start();
assertThat(latch3.await(60, TimeUnit.SECONDS)).isTrue();
resettingContainer.stop();
@@ -307,8 +319,7 @@ public class ConcurrentMessageListenerContainerTests {
topic1Partition0 = new TopicPartitionInitialOffset(topic3, 0, -1000L);
topic1Partition1 = new TopicPartitionInitialOffset(topic3, 1, -1L);
ContainerProperties container4Props = new ContainerProperties(topic1Partition0, topic1Partition1);
resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container4Props);
resettingContainer.setBeanName("b4");
final CountDownLatch latch4 = new CountDownLatch(3);
final AtomicReference<String> receivedMessage = new AtomicReference<>();
container4Props.setMessageListener((MessageListener<Integer, String>) message -> {
@@ -316,6 +327,8 @@ public class ConcurrentMessageListenerContainerTests {
receivedMessage.set(message.value());
latch4.countDown();
});
resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container4Props);
resettingContainer.setBeanName("b4");
resettingContainer.start();
assertThat(latch4.await(60, TimeUnit.SECONDS)).isTrue();
resettingContainer.stop();
@@ -330,8 +343,7 @@ public class ConcurrentMessageListenerContainerTests {
topic1Partition0 = new TopicPartitionInitialOffset(topic3, 0, 1L);
topic1Partition1 = new TopicPartitionInitialOffset(topic3, 1, 1L);
ContainerProperties container5Props = new ContainerProperties(topic1Partition0, topic1Partition1);
resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container5Props);
resettingContainer.setBeanName("b5");
final CountDownLatch latch5 = new CountDownLatch(4);
final List<String> messages = new ArrayList<>();
container5Props.setMessageListener((MessageListener<Integer, String>) message -> {
@@ -339,6 +351,9 @@ public class ConcurrentMessageListenerContainerTests {
messages.add(message.value());
latch5.countDown();
});
resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container5Props);
resettingContainer.setBeanName("b5");
resettingContainer.start();
assertThat(latch5.await(60, TimeUnit.SECONDS)).isTrue();
resettingContainer.stop();
@@ -353,8 +368,7 @@ public class ConcurrentMessageListenerContainerTests {
topic1Partition0 = new TopicPartitionInitialOffset(topic3, 0, 1L, true);
topic1Partition1 = new TopicPartitionInitialOffset(topic3, 1, -1L, true);
ContainerProperties container6Props = new ContainerProperties(topic1Partition0, topic1Partition1);
resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container6Props);
resettingContainer.setBeanName("b6");
final CountDownLatch latch6 = new CountDownLatch(4);
final List<String> messages6 = new ArrayList<>();
container6Props.setMessageListener((MessageListener<Integer, String>) message -> {
@@ -362,6 +376,9 @@ public class ConcurrentMessageListenerContainerTests {
messages6.add(message.value());
latch6.countDown();
});
resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container6Props);
resettingContainer.setBeanName("b6");
resettingContainer.start();
assertThat(latch6.await(60, TimeUnit.SECONDS)).isTrue();
resettingContainer.stop();
@@ -385,19 +402,22 @@ public class ConcurrentMessageListenerContainerTests {
Map<String, Object> props = KafkaTestUtils.consumerProps("test" + ackMode, "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic);
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(4);
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
ConcurrentMessageListenerContainerTests.this.logger.info("manual: " + message);
ack.acknowledge();
latch.countDown();
});
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
container.setConcurrency(2);
containerProps.setAckMode(ackMode);
container.setBeanName("test" + ackMode);
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
@@ -429,17 +449,14 @@ public class ConcurrentMessageListenerContainerTests {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic7);
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(8);
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
ConcurrentMessageListenerContainerTests.this.logger.info("manualExisting: " + message);
ack.acknowledge();
latch.countDown();
});
container.setConcurrency(1);
containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE);
container.setBeanName("testManualExisting");
final CountDownLatch commits = new CountDownLatch(8);
final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
containerProps.setCommitCallback((offsets, exception) -> {
@@ -448,6 +465,12 @@ public class ConcurrentMessageListenerContainerTests {
exceptionRef.compareAndSet(null, exception);
}
});
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
container.setConcurrency(1);
container.setBeanName("testManualExisting");
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
template.sendDefault(0, "fooo");
@@ -479,8 +502,6 @@ public class ConcurrentMessageListenerContainerTests {
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
ContainerProperties containerProps = new ContainerProperties(topic8);
containerProps.setSyncCommits(true);
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(8);
final BitSet bitSet = new BitSet(8);
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
@@ -489,8 +510,11 @@ public class ConcurrentMessageListenerContainerTests {
bitSet.set((int) (message.partition() * 4 + message.offset()));
latch.countDown();
});
container.setConcurrency(1);
containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE);
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
container.setConcurrency(1);
container.setBeanName("testManualExisting");
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
@@ -531,9 +555,10 @@ public class ConcurrentMessageListenerContainerTests {
});
ContainerProperties containerProps = new ContainerProperties(topic1PartitionS);
containerProps.setMessageListener((MessageListener<Integer, String>) message -> { });
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
containerProps.setMessageListener((MessageListener<Integer, String>) message -> { });
container.setConcurrency(3);
container.start();
List<KafkaMessageListenerContainer<Integer, String>> containers = KafkaTestUtils.getPropertyValue(container,
@@ -556,8 +581,6 @@ public class ConcurrentMessageListenerContainerTests {
ContainerProperties containerProps2 = new ContainerProperties(topic2);
BeanUtils.copyProperties(containerProps, containerProps2,
"topics", "topicPartitions", "topicPattern", "ackCount", "ackTime");
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(4);
final AtomicBoolean catchError = new AtomicBoolean(false);
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
@@ -565,9 +588,13 @@ public class ConcurrentMessageListenerContainerTests {
latch.countDown();
throw new RuntimeException("intended");
});
containerProps.setErrorHandler((thrownException, record) -> catchError.set(true));
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
container.setConcurrency(2);
container.setBeanName("testException");
containerProps.setErrorHandler((thrownException, record) -> catchError.set(true));
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
@@ -642,10 +669,6 @@ public class ConcurrentMessageListenerContainerTests {
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "20000");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic1);
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
ConcurrentMessageListenerContainer<Integer, String> container2 =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(8);
final Set<String> listenerThreadNames = Collections.synchronizedSet(new HashSet<String>());
List<String> receivedMessages = Collections.synchronizedList(new ArrayList<>());
@@ -661,6 +684,11 @@ public class ConcurrentMessageListenerContainerTests {
listenerThreadNames.add(Thread.currentThread().getName());
latch.countDown();
});
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
ConcurrentMessageListenerContainer<Integer, String> container2 =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
container.setConcurrency(1);
container2.setConcurrency(1);
container.setBeanName("testAuto");

View File

@@ -55,7 +55,6 @@ import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.retry.backoff.FixedBackOffPolicy;
@@ -101,28 +100,25 @@ public class KafkaMessageListenerContainerTests {
// props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 6); // 2 per poll
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
ContainerProperties containerProps = new ContainerProperties(topic1);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(6);
final BitSet bitSet = new BitSet(6);
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
logger.info("slow1: " + message);
bitSet.set((int) (message.partition() * 3 + message.offset()));
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
latch.countDown();
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
logger.info("slow1: " + message);
bitSet.set((int) (message.partition() * 3 + message.offset()));
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
latch.countDown();
});
container.setBeanName("testSlow1");
containerProps.setPauseAfter(100);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("testSlow1");
container.start();
Consumer<?, ?> consumer = spyOnConsumer(container);
@@ -162,32 +158,29 @@ public class KafkaMessageListenerContainerTests {
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
ContainerProperties containerProps = new ContainerProperties(topic);
containerProps.setSyncCommits(true);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(6);
final BitSet bitSet = new BitSet(4);
containerProps.setMessageListener(new AcknowledgingMessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message, Acknowledgment ack) {
logger.info("slow2: " + message);
bitSet.set((int) (message.partition() * 3 + message.offset()));
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
ack.acknowledge();
latch.countDown();
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
logger.info("slow2: " + message);
bitSet.set((int) (message.partition() * 3 + message.offset()));
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
ack.acknowledge();
latch.countDown();
});
container.setBeanName("testSlow2");
containerProps.setPauseAfter(100);
containerProps.setAckMode(ackMode);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("testSlow2");
container.start();
Consumer<?, ?> consumer = spyOnConsumer(container);
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
@@ -221,8 +214,6 @@ public class KafkaMessageListenerContainerTests {
containerProps.setPauseAfter(100);
containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE);
containerProps.setSyncCommits(true);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
logger.info("slow: " + message);
@@ -234,6 +225,10 @@ public class KafkaMessageListenerContainerTests {
}
ack.acknowledge();
});
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("testSlow");
container.start();
@@ -281,14 +276,15 @@ public class KafkaMessageListenerContainerTests {
containerProps.setAckTime(20000);
containerProps.setAckCount(20000);
containerProps.setAckMode(AckMode.COUNT_TIME);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(4);
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
logger.info("slow: " + message);
ack.acknowledge();
latch.countDown();
});
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("testManualFlushed");
container.start();
@@ -322,8 +318,7 @@ public class KafkaMessageListenerContainerTests {
Map<String, Object> props = KafkaTestUtils.consumerProps("slow3", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
ContainerProperties containerProps = new ContainerProperties(topic3);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(18);
final BitSet bitSet = new BitSet(6);
final Map<String, AtomicInteger> faults = new HashMap<>();
@@ -350,6 +345,9 @@ public class KafkaMessageListenerContainerTests {
}, buildRetry(), null);
containerProps.setMessageListener(adapter);
containerProps.setPauseAfter(100);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("testSlow3");
container.start();
@@ -383,8 +381,6 @@ public class KafkaMessageListenerContainerTests {
Map<String, Object> props = KafkaTestUtils.consumerProps("slow4", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
ContainerProperties containerProps = new ContainerProperties(topic4);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(18);
final BitSet bitSet = new BitSet(6);
final Map<String, AtomicInteger> faults = new HashMap<>();
@@ -419,6 +415,9 @@ public class KafkaMessageListenerContainerTests {
}, buildRetry(), null);
containerProps.setMessageListener(adapter);
containerProps.setPauseAfter(100);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("testSlow4");
container.start();
@@ -458,6 +457,7 @@ public class KafkaMessageListenerContainerTests {
containerProps.setSyncCommits(true);
containerProps.setAckMode(AckMode.RECORD);
containerProps.setAckOnError(false);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
containerProps);
container.setBeanName("testRecordAcks");
@@ -523,6 +523,7 @@ public class KafkaMessageListenerContainerTests {
containerProps.setAckMode(AckMode.BATCH);
containerProps.setPollTimeout(10000);
containerProps.setAckOnError(false);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
containerProps);
container.setBeanName("testBatchAcks");