GH-185: Dont Populate Ack for Non-Manual Ack Modes

Fixes: https://github.com/spring-projects/spring-kafka/issues/185

If a listener is an `AcknowledgingMessageListener`, do not provide an ack if
the container is not configured for manual acks.
This commit is contained in:
Gary Russell
2016-09-21 13:51:09 -04:00
parent d49f64428b
commit 7f832a876a
3 changed files with 10 additions and 6 deletions

View File

@@ -594,8 +594,8 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
}
try {
if (this.acknowledgingMessageListener != null) {
this.acknowledgingMessageListener.onMessage(record,
new ConsumerAcknowledgment(record, this.isManualImmediateAck));
this.acknowledgingMessageListener.onMessage(record, this.isAnyManualAck
? new ConsumerAcknowledgment(record, this.isManualImmediateAck) : null);
}
else {
this.listener.onMessage(record);

View File

@@ -393,10 +393,10 @@ public class ConcurrentMessageListenerContainerTests {
latch.countDown();
});
containerProps.setAckMode(ackMode);
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
container.setConcurrency(2);
containerProps.setAckMode(ackMode);
container.setBeanName("test" + ackMode);
container.start();

View File

@@ -33,6 +33,7 @@ import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -54,6 +55,7 @@ import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -103,7 +105,8 @@ public class KafkaMessageListenerContainerTests {
final CountDownLatch latch = new CountDownLatch(6);
final BitSet bitSet = new BitSet(6);
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
final AtomicReference<Acknowledgment> ackNull = new AtomicReference<>();
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
logger.info("slow1: " + message);
bitSet.set((int) (message.partition() * 3 + message.offset()));
try {
@@ -112,6 +115,7 @@ public class KafkaMessageListenerContainerTests {
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
ackNull.set(ack);
latch.countDown();
});
containerProps.setPauseAfter(100);
@@ -138,6 +142,7 @@ public class KafkaMessageListenerContainerTests {
template.sendDefault(2, "buz");
template.flush();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(ackNull.get()).isNull();
assertThat(bitSet.cardinality()).isEqualTo(6);
verify(consumer, atLeastOnce()).pause(any(TopicPartition.class), any(TopicPartition.class));
verify(consumer, atLeastOnce()).resume(any(TopicPartition.class), any(TopicPartition.class));
@@ -278,9 +283,8 @@ public class KafkaMessageListenerContainerTests {
containerProps.setAckMode(AckMode.COUNT_TIME);
final CountDownLatch latch = new CountDownLatch(4);
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
logger.info("slow: " + message);
ack.acknowledge();
latch.countDown();
});
KafkaMessageListenerContainer<Integer, String> container =