GH-154: Copy ContainerProperties in the Container
Fixes: GH-154 (https://github.com/spring-projects/spring-kafka/issues/154) Previously the `AbstractMessageListenerContainer` save external `ContainerProperties` instance leaving the room for external/internal mutation, which may lead to unexpected behavior * Create a new instance of the `ContainerProperties` in the `AbstractMessageListenerContainer` ctor based on the provided `ContainerProperties` and use `BeanUtils.copyProperties()` for convenience. * Refactoring for tests to meet a new state. Some of them indeed modified `ContainerProperties` after container instantiation. * Remove fake `"propertiesFactory"` topic from the `AbstractKafkaListenerContainerFactory.containerProperties` instance to avoid unexpected behavior. Change that to `(Pattern) null` **Cherry-pick to 1.0.x** Conflicts: spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java Resolved.
This commit is contained in:
committed by
Gary Russell
parent
c703a30714
commit
8a3e562f43
@@ -17,6 +17,8 @@
|
||||
package org.springframework.kafka.config;
|
||||
|
||||
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.context.ApplicationEventPublisherAware;
|
||||
@@ -44,7 +46,7 @@ import org.springframework.retry.support.RetryTemplate;
|
||||
public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K, V>, K, V>
|
||||
implements KafkaListenerContainerFactory<C>, ApplicationEventPublisherAware {
|
||||
|
||||
private final ContainerProperties containerProperties = new ContainerProperties("propertiesFactory");
|
||||
private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null);
|
||||
|
||||
private ConsumerFactory<K, V> consumerFactory;
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.beans.factory.BeanNameAware;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.context.ApplicationEventPublisherAware;
|
||||
@@ -113,9 +114,28 @@ public abstract class AbstractMessageListenerContainer<K, V>
|
||||
|
||||
protected AbstractMessageListenerContainer(ContainerProperties containerProperties) {
|
||||
Assert.notNull(containerProperties, "'containerProperties' cannot be null");
|
||||
this.containerProperties = containerProperties;
|
||||
if (containerProperties.getConsumerRebalanceListener() == null) {
|
||||
containerProperties.setConsumerRebalanceListener(createConsumerRebalanceListener());
|
||||
|
||||
if (containerProperties.getTopics() != null) {
|
||||
this.containerProperties = new ContainerProperties(containerProperties.getTopics());
|
||||
}
|
||||
else if (containerProperties.getTopicPattern() != null) {
|
||||
this.containerProperties = new ContainerProperties(containerProperties.getTopicPattern());
|
||||
}
|
||||
else {
|
||||
this.containerProperties = new ContainerProperties(containerProperties.getTopicPartitions());
|
||||
}
|
||||
|
||||
BeanUtils.copyProperties(containerProperties, this.containerProperties,
|
||||
"topics", "topicPartitions", "topicPattern", "ackCount", "ackTime");
|
||||
|
||||
if (containerProperties.getAckCount() > 0) {
|
||||
this.containerProperties.setAckCount(containerProperties.getAckCount());
|
||||
}
|
||||
if (containerProperties.getAckTime() > 0) {
|
||||
this.containerProperties.setAckTime(containerProperties.getAckTime());
|
||||
}
|
||||
if (this.containerProperties.getConsumerRebalanceListener() == null) {
|
||||
this.containerProperties.setConsumerRebalanceListener(createConsumerRebalanceListener());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -101,8 +101,7 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "true", embeddedKafka);
|
||||
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
|
||||
ContainerProperties containerProps = new ContainerProperties(topic1);
|
||||
ConcurrentMessageListenerContainer<Integer, String> container =
|
||||
new ConcurrentMessageListenerContainer<>(cf, containerProps);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(4);
|
||||
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
|
||||
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
|
||||
@@ -110,10 +109,15 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
listenerThreadNames.add(Thread.currentThread().getName());
|
||||
latch.countDown();
|
||||
});
|
||||
|
||||
ConcurrentMessageListenerContainer<Integer, String> container =
|
||||
new ConcurrentMessageListenerContainer<>(cf, containerProps);
|
||||
container.setConcurrency(2);
|
||||
container.setBeanName("testAuto");
|
||||
container.start();
|
||||
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
|
||||
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
|
||||
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
|
||||
@@ -135,8 +139,7 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
Map<String, Object> props = KafkaTestUtils.consumerProps("test10", "true", embeddedKafka);
|
||||
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
|
||||
ContainerProperties containerProps = new ContainerProperties(topic1);
|
||||
ConcurrentMessageListenerContainer<Integer, String> container =
|
||||
new ConcurrentMessageListenerContainer<>(cf, containerProps);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(4);
|
||||
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
|
||||
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
|
||||
@@ -162,10 +165,14 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
|
||||
});
|
||||
|
||||
ConcurrentMessageListenerContainer<Integer, String> container =
|
||||
new ConcurrentMessageListenerContainer<>(cf, containerProps);
|
||||
container.setConcurrency(2);
|
||||
container.setBeanName("testAuto");
|
||||
container.start();
|
||||
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
|
||||
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
|
||||
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
|
||||
@@ -189,8 +196,7 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
Map<String, Object> props = KafkaTestUtils.consumerProps("test2", "false", embeddedKafka);
|
||||
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
|
||||
ContainerProperties containerProps = new ContainerProperties(topic2);
|
||||
ConcurrentMessageListenerContainer<Integer, String> container =
|
||||
new ConcurrentMessageListenerContainer<>(cf, containerProps);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(4);
|
||||
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
|
||||
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
|
||||
@@ -198,10 +204,15 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
listenerThreadNames.add(Thread.currentThread().getName());
|
||||
latch.countDown();
|
||||
});
|
||||
|
||||
ConcurrentMessageListenerContainer<Integer, String> container =
|
||||
new ConcurrentMessageListenerContainer<>(cf, containerProps);
|
||||
container.setConcurrency(2);
|
||||
container.setBeanName("testBatch");
|
||||
container.start();
|
||||
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
|
||||
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
|
||||
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
|
||||
@@ -247,25 +258,25 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
};
|
||||
|
||||
ContainerProperties container1Props = new ContainerProperties(topic1Partition0);
|
||||
ConcurrentMessageListenerContainer<Integer, String> container1 =
|
||||
new ConcurrentMessageListenerContainer<>(cf, container1Props);
|
||||
final CountDownLatch latch1 = new CountDownLatch(2);
|
||||
container1Props.setMessageListener((MessageListener<Integer, String>) message -> {
|
||||
ConcurrentMessageListenerContainerTests.this.logger.info("auto part: " + message);
|
||||
latch1.countDown();
|
||||
});
|
||||
ConcurrentMessageListenerContainer<Integer, String> container1 =
|
||||
new ConcurrentMessageListenerContainer<>(cf, container1Props);
|
||||
container1.setBeanName("b1");
|
||||
container1.start();
|
||||
|
||||
TopicPartitionInitialOffset topic1Partition1 = new TopicPartitionInitialOffset(topic3, 1);
|
||||
ContainerProperties container2Props = new ContainerProperties(topic1Partition1);
|
||||
ConcurrentMessageListenerContainer<Integer, String> container2 =
|
||||
new ConcurrentMessageListenerContainer<>(cf, container2Props);
|
||||
final CountDownLatch latch2 = new CountDownLatch(2);
|
||||
container2Props.setMessageListener((MessageListener<Integer, String>) message -> {
|
||||
ConcurrentMessageListenerContainerTests.this.logger.info("auto part: " + message);
|
||||
latch2.countDown();
|
||||
});
|
||||
ConcurrentMessageListenerContainer<Integer, String> container2 =
|
||||
new ConcurrentMessageListenerContainer<>(cf, container2Props);
|
||||
container2.setBeanName("b2");
|
||||
container2.start();
|
||||
|
||||
@@ -290,14 +301,15 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
cf = new DefaultKafkaConsumerFactory<>(props);
|
||||
// reset earliest
|
||||
ContainerProperties container3Props = new ContainerProperties(topic1Partition0, topic1Partition1);
|
||||
ConcurrentMessageListenerContainer<Integer, String> resettingContainer =
|
||||
new ConcurrentMessageListenerContainer<>(cf, container3Props);
|
||||
resettingContainer.setBeanName("b3");
|
||||
|
||||
final CountDownLatch latch3 = new CountDownLatch(4);
|
||||
container3Props.setMessageListener((MessageListener<Integer, String>) message -> {
|
||||
ConcurrentMessageListenerContainerTests.this.logger.info("auto part e: " + message);
|
||||
latch3.countDown();
|
||||
});
|
||||
ConcurrentMessageListenerContainer<Integer, String> resettingContainer =
|
||||
new ConcurrentMessageListenerContainer<>(cf, container3Props);
|
||||
resettingContainer.setBeanName("b3");
|
||||
resettingContainer.start();
|
||||
assertThat(latch3.await(60, TimeUnit.SECONDS)).isTrue();
|
||||
resettingContainer.stop();
|
||||
@@ -309,8 +321,7 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
topic1Partition0 = new TopicPartitionInitialOffset(topic3, 0, -1000L);
|
||||
topic1Partition1 = new TopicPartitionInitialOffset(topic3, 1, -1L);
|
||||
ContainerProperties container4Props = new ContainerProperties(topic1Partition0, topic1Partition1);
|
||||
resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container4Props);
|
||||
resettingContainer.setBeanName("b4");
|
||||
|
||||
final CountDownLatch latch4 = new CountDownLatch(3);
|
||||
final AtomicReference<String> receivedMessage = new AtomicReference<>();
|
||||
container4Props.setMessageListener((MessageListener<Integer, String>) message -> {
|
||||
@@ -318,6 +329,8 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
receivedMessage.set(message.value());
|
||||
latch4.countDown();
|
||||
});
|
||||
resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container4Props);
|
||||
resettingContainer.setBeanName("b4");
|
||||
resettingContainer.start();
|
||||
assertThat(latch4.await(60, TimeUnit.SECONDS)).isTrue();
|
||||
resettingContainer.stop();
|
||||
@@ -332,8 +345,7 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
topic1Partition0 = new TopicPartitionInitialOffset(topic3, 0, 1L);
|
||||
topic1Partition1 = new TopicPartitionInitialOffset(topic3, 1, 1L);
|
||||
ContainerProperties container5Props = new ContainerProperties(topic1Partition0, topic1Partition1);
|
||||
resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container5Props);
|
||||
resettingContainer.setBeanName("b4");
|
||||
|
||||
final CountDownLatch latch5 = new CountDownLatch(4);
|
||||
final List<String> messages = new ArrayList<>();
|
||||
container5Props.setMessageListener((MessageListener<Integer, String>) message -> {
|
||||
@@ -341,11 +353,13 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
messages.add(message.value());
|
||||
latch5.countDown();
|
||||
});
|
||||
|
||||
resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container5Props);
|
||||
resettingContainer.setBeanName("b5");
|
||||
resettingContainer.start();
|
||||
assertThat(latch5.await(60, TimeUnit.SECONDS)).isTrue();
|
||||
resettingContainer.stop();
|
||||
assertThat(messages).contains("baz", "qux", "FOO", "BAZ");
|
||||
|
||||
this.logger.info("Stop auto parts");
|
||||
}
|
||||
|
||||
@@ -363,19 +377,22 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
Map<String, Object> props = KafkaTestUtils.consumerProps("test" + ackMode, "false", embeddedKafka);
|
||||
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
|
||||
ContainerProperties containerProps = new ContainerProperties(topic);
|
||||
ConcurrentMessageListenerContainer<Integer, String> container =
|
||||
new ConcurrentMessageListenerContainer<>(cf, containerProps);
|
||||
final CountDownLatch latch = new CountDownLatch(4);
|
||||
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
|
||||
ConcurrentMessageListenerContainerTests.this.logger.info("manual: " + message);
|
||||
ack.acknowledge();
|
||||
latch.countDown();
|
||||
});
|
||||
|
||||
ConcurrentMessageListenerContainer<Integer, String> container =
|
||||
new ConcurrentMessageListenerContainer<>(cf, containerProps);
|
||||
container.setConcurrency(2);
|
||||
containerProps.setAckMode(ackMode);
|
||||
container.setBeanName("test" + ackMode);
|
||||
container.start();
|
||||
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
|
||||
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
|
||||
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
|
||||
@@ -407,17 +424,14 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
|
||||
ContainerProperties containerProps = new ContainerProperties(topic7);
|
||||
ConcurrentMessageListenerContainer<Integer, String> container =
|
||||
new ConcurrentMessageListenerContainer<>(cf, containerProps);
|
||||
final CountDownLatch latch = new CountDownLatch(8);
|
||||
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
|
||||
ConcurrentMessageListenerContainerTests.this.logger.info("manualExisting: " + message);
|
||||
ack.acknowledge();
|
||||
latch.countDown();
|
||||
});
|
||||
container.setConcurrency(1);
|
||||
containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE);
|
||||
container.setBeanName("testManualExisting");
|
||||
|
||||
final CountDownLatch commits = new CountDownLatch(8);
|
||||
final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
|
||||
containerProps.setCommitCallback((offsets, exception) -> {
|
||||
@@ -426,6 +440,12 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
exceptionRef.compareAndSet(null, exception);
|
||||
}
|
||||
});
|
||||
|
||||
ConcurrentMessageListenerContainer<Integer, String> container =
|
||||
new ConcurrentMessageListenerContainer<>(cf, containerProps);
|
||||
container.setConcurrency(1);
|
||||
container.setBeanName("testManualExisting");
|
||||
|
||||
container.start();
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
|
||||
template.sendDefault(0, "fooo");
|
||||
@@ -457,8 +477,6 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
|
||||
ContainerProperties containerProps = new ContainerProperties(topic8);
|
||||
containerProps.setSyncCommits(true);
|
||||
ConcurrentMessageListenerContainer<Integer, String> container =
|
||||
new ConcurrentMessageListenerContainer<>(cf, containerProps);
|
||||
final CountDownLatch latch = new CountDownLatch(8);
|
||||
final BitSet bitSet = new BitSet(8);
|
||||
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
|
||||
@@ -467,8 +485,11 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
bitSet.set((int) (message.partition() * 4 + message.offset()));
|
||||
latch.countDown();
|
||||
});
|
||||
container.setConcurrency(1);
|
||||
containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE);
|
||||
|
||||
ConcurrentMessageListenerContainer<Integer, String> container =
|
||||
new ConcurrentMessageListenerContainer<>(cf, containerProps);
|
||||
container.setConcurrency(1);
|
||||
container.setBeanName("testManualExisting");
|
||||
container.start();
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
|
||||
@@ -509,9 +530,10 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
|
||||
});
|
||||
ContainerProperties containerProps = new ContainerProperties(topic1PartitionS);
|
||||
containerProps.setMessageListener((MessageListener<Integer, String>) message -> { });
|
||||
|
||||
ConcurrentMessageListenerContainer<Integer, String> container =
|
||||
new ConcurrentMessageListenerContainer<>(cf, containerProps);
|
||||
containerProps.setMessageListener((MessageListener<Integer, String>) message -> { });
|
||||
container.setConcurrency(3);
|
||||
container.start();
|
||||
List<KafkaMessageListenerContainer<Integer, String>> containers = KafkaTestUtils.getPropertyValue(container,
|
||||
@@ -534,8 +556,6 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
ContainerProperties containerProps2 = new ContainerProperties(topic2);
|
||||
BeanUtils.copyProperties(containerProps, containerProps2,
|
||||
"topics", "topicPartitions", "topicPattern", "ackCount", "ackTime");
|
||||
ConcurrentMessageListenerContainer<Integer, String> container =
|
||||
new ConcurrentMessageListenerContainer<>(cf, containerProps);
|
||||
final CountDownLatch latch = new CountDownLatch(4);
|
||||
final AtomicBoolean catchError = new AtomicBoolean(false);
|
||||
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
|
||||
@@ -543,9 +563,13 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
latch.countDown();
|
||||
throw new RuntimeException("intended");
|
||||
});
|
||||
containerProps.setErrorHandler((thrownException, record) -> catchError.set(true));
|
||||
|
||||
ConcurrentMessageListenerContainer<Integer, String> container =
|
||||
new ConcurrentMessageListenerContainer<>(cf, containerProps);
|
||||
container.setConcurrency(2);
|
||||
container.setBeanName("testException");
|
||||
containerProps.setErrorHandler((thrownException, record) -> catchError.set(true));
|
||||
|
||||
container.start();
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
@@ -620,10 +644,6 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "20000");
|
||||
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
|
||||
ContainerProperties containerProps = new ContainerProperties(topic1);
|
||||
ConcurrentMessageListenerContainer<Integer, String> container =
|
||||
new ConcurrentMessageListenerContainer<>(cf, containerProps);
|
||||
ConcurrentMessageListenerContainer<Integer, String> container2 =
|
||||
new ConcurrentMessageListenerContainer<>(cf, containerProps);
|
||||
final CountDownLatch latch = new CountDownLatch(8);
|
||||
final Set<String> listenerThreadNames = Collections.synchronizedSet(new HashSet<String>());
|
||||
List<String> receivedMessages = Collections.synchronizedList(new ArrayList<>());
|
||||
@@ -639,6 +659,11 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
listenerThreadNames.add(Thread.currentThread().getName());
|
||||
latch.countDown();
|
||||
});
|
||||
|
||||
ConcurrentMessageListenerContainer<Integer, String> container =
|
||||
new ConcurrentMessageListenerContainer<>(cf, containerProps);
|
||||
ConcurrentMessageListenerContainer<Integer, String> container2 =
|
||||
new ConcurrentMessageListenerContainer<>(cf, containerProps);
|
||||
container.setConcurrency(1);
|
||||
container2.setConcurrency(1);
|
||||
container.setBeanName("testAuto");
|
||||
|
||||
@@ -54,7 +54,6 @@ import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
|
||||
import org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter;
|
||||
import org.springframework.kafka.listener.config.ContainerProperties;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils;
|
||||
import org.springframework.retry.backoff.FixedBackOffPolicy;
|
||||
@@ -100,28 +99,25 @@ public class KafkaMessageListenerContainerTests {
|
||||
// props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 6); // 2 per poll
|
||||
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
|
||||
ContainerProperties containerProps = new ContainerProperties(topic1);
|
||||
KafkaMessageListenerContainer<Integer, String> container =
|
||||
new KafkaMessageListenerContainer<>(cf, containerProps);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(6);
|
||||
final BitSet bitSet = new BitSet(6);
|
||||
containerProps.setMessageListener(new MessageListener<Integer, String>() {
|
||||
|
||||
@Override
|
||||
public void onMessage(ConsumerRecord<Integer, String> message) {
|
||||
logger.info("slow1: " + message);
|
||||
bitSet.set((int) (message.partition() * 3 + message.offset()));
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
latch.countDown();
|
||||
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
|
||||
logger.info("slow1: " + message);
|
||||
bitSet.set((int) (message.partition() * 3 + message.offset()));
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
latch.countDown();
|
||||
});
|
||||
container.setBeanName("testSlow1");
|
||||
containerProps.setPauseAfter(100);
|
||||
KafkaMessageListenerContainer<Integer, String> container =
|
||||
new KafkaMessageListenerContainer<>(cf, containerProps);
|
||||
container.setBeanName("testSlow1");
|
||||
|
||||
|
||||
container.start();
|
||||
Consumer<?, ?> consumer = spyOnConsumer(container);
|
||||
@@ -161,32 +157,29 @@ public class KafkaMessageListenerContainerTests {
|
||||
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
|
||||
ContainerProperties containerProps = new ContainerProperties(topic);
|
||||
containerProps.setSyncCommits(true);
|
||||
KafkaMessageListenerContainer<Integer, String> container =
|
||||
new KafkaMessageListenerContainer<>(cf, containerProps);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(6);
|
||||
final BitSet bitSet = new BitSet(4);
|
||||
containerProps.setMessageListener(new AcknowledgingMessageListener<Integer, String>() {
|
||||
|
||||
@Override
|
||||
public void onMessage(ConsumerRecord<Integer, String> message, Acknowledgment ack) {
|
||||
logger.info("slow2: " + message);
|
||||
bitSet.set((int) (message.partition() * 3 + message.offset()));
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
ack.acknowledge();
|
||||
latch.countDown();
|
||||
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
|
||||
logger.info("slow2: " + message);
|
||||
bitSet.set((int) (message.partition() * 3 + message.offset()));
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
ack.acknowledge();
|
||||
latch.countDown();
|
||||
});
|
||||
container.setBeanName("testSlow2");
|
||||
containerProps.setPauseAfter(100);
|
||||
containerProps.setAckMode(ackMode);
|
||||
|
||||
KafkaMessageListenerContainer<Integer, String> container =
|
||||
new KafkaMessageListenerContainer<>(cf, containerProps);
|
||||
container.setBeanName("testSlow2");
|
||||
container.start();
|
||||
|
||||
Consumer<?, ?> consumer = spyOnConsumer(container);
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
|
||||
|
||||
@@ -220,8 +213,6 @@ public class KafkaMessageListenerContainerTests {
|
||||
containerProps.setPauseAfter(100);
|
||||
containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE);
|
||||
containerProps.setSyncCommits(true);
|
||||
KafkaMessageListenerContainer<Integer, String> container =
|
||||
new KafkaMessageListenerContainer<>(cf, containerProps);
|
||||
|
||||
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
|
||||
logger.info("slow: " + message);
|
||||
@@ -233,6 +224,10 @@ public class KafkaMessageListenerContainerTests {
|
||||
}
|
||||
ack.acknowledge();
|
||||
});
|
||||
|
||||
KafkaMessageListenerContainer<Integer, String> container =
|
||||
new KafkaMessageListenerContainer<>(cf, containerProps);
|
||||
|
||||
container.setBeanName("testSlow");
|
||||
|
||||
container.start();
|
||||
@@ -280,14 +275,15 @@ public class KafkaMessageListenerContainerTests {
|
||||
containerProps.setAckTime(20000);
|
||||
containerProps.setAckCount(20000);
|
||||
containerProps.setAckMode(AckMode.COUNT_TIME);
|
||||
KafkaMessageListenerContainer<Integer, String> container =
|
||||
new KafkaMessageListenerContainer<>(cf, containerProps);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(4);
|
||||
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
|
||||
logger.info("slow: " + message);
|
||||
ack.acknowledge();
|
||||
latch.countDown();
|
||||
});
|
||||
KafkaMessageListenerContainer<Integer, String> container =
|
||||
new KafkaMessageListenerContainer<>(cf, containerProps);
|
||||
container.setBeanName("testManualFlushed");
|
||||
|
||||
container.start();
|
||||
@@ -321,8 +317,7 @@ public class KafkaMessageListenerContainerTests {
|
||||
Map<String, Object> props = KafkaTestUtils.consumerProps("slow3", "false", embeddedKafka);
|
||||
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
|
||||
ContainerProperties containerProps = new ContainerProperties(topic3);
|
||||
KafkaMessageListenerContainer<Integer, String> container =
|
||||
new KafkaMessageListenerContainer<>(cf, containerProps);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(18);
|
||||
final BitSet bitSet = new BitSet(6);
|
||||
final Map<String, AtomicInteger> faults = new HashMap<>();
|
||||
@@ -349,6 +344,9 @@ public class KafkaMessageListenerContainerTests {
|
||||
}, buildRetry(), null);
|
||||
containerProps.setMessageListener(adapter);
|
||||
containerProps.setPauseAfter(100);
|
||||
|
||||
KafkaMessageListenerContainer<Integer, String> container =
|
||||
new KafkaMessageListenerContainer<>(cf, containerProps);
|
||||
container.setBeanName("testSlow3");
|
||||
|
||||
container.start();
|
||||
@@ -382,8 +380,6 @@ public class KafkaMessageListenerContainerTests {
|
||||
Map<String, Object> props = KafkaTestUtils.consumerProps("slow4", "false", embeddedKafka);
|
||||
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
|
||||
ContainerProperties containerProps = new ContainerProperties(topic4);
|
||||
KafkaMessageListenerContainer<Integer, String> container =
|
||||
new KafkaMessageListenerContainer<>(cf, containerProps);
|
||||
final CountDownLatch latch = new CountDownLatch(18);
|
||||
final BitSet bitSet = new BitSet(6);
|
||||
final Map<String, AtomicInteger> faults = new HashMap<>();
|
||||
@@ -418,6 +414,9 @@ public class KafkaMessageListenerContainerTests {
|
||||
}, buildRetry(), null);
|
||||
containerProps.setMessageListener(adapter);
|
||||
containerProps.setPauseAfter(100);
|
||||
|
||||
KafkaMessageListenerContainer<Integer, String> container =
|
||||
new KafkaMessageListenerContainer<>(cf, containerProps);
|
||||
container.setBeanName("testSlow4");
|
||||
|
||||
container.start();
|
||||
@@ -457,6 +456,7 @@ public class KafkaMessageListenerContainerTests {
|
||||
containerProps.setSyncCommits(true);
|
||||
containerProps.setAckMode(AckMode.RECORD);
|
||||
containerProps.setAckOnError(false);
|
||||
|
||||
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
|
||||
containerProps);
|
||||
container.setBeanName("testRecordAcks");
|
||||
@@ -522,6 +522,7 @@ public class KafkaMessageListenerContainerTests {
|
||||
containerProps.setAckMode(AckMode.BATCH);
|
||||
containerProps.setPollTimeout(10000);
|
||||
containerProps.setAckOnError(false);
|
||||
|
||||
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
|
||||
containerProps);
|
||||
container.setBeanName("testBatchAcks");
|
||||
|
||||
Reference in New Issue
Block a user