GH-105: Externalize Retry

Resolves #105

Extract the retry template into adapters.

Polishing

Redo `ConcurrentMessageListenerContainerTests` to Java 8 style.
Change `ArrayList` for thread names to the `ConcurrentSkipListSet` which is synchronized
This commit is contained in:
Gary Russell
2016-06-01 17:06:01 -04:00
committed by Artem Bilan
parent be8b438fdd
commit e7240e207d
14 changed files with 489 additions and 319 deletions

View File

@@ -25,6 +25,8 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;
/**
* Base {@link KafkaListenerContainerFactory} for Spring's base container implementation.
@@ -55,6 +57,10 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
private Boolean ackDiscarded;
private RetryTemplate retryTemplate;
private RecoveryCallback<Void> recoveryCallback;
private ApplicationEventPublisher applicationEventPublisher;
/**
@@ -111,6 +117,23 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
this.ackDiscarded = ackDiscarded;
}
/**
* Set a retryTemplate.
* @param retryTemplate the template.
*/
public void setRetryTemplate(RetryTemplate retryTemplate) {
this.retryTemplate = retryTemplate;
}
/**
* Set a callback to be used with the {@link #setRetryTemplate(RetryTemplate)
* retryTemplate}.
* @param recoveryCallback the callback.
*/
public void setRecoveryCallback(RecoveryCallback<Void> recoveryCallback) {
this.recoveryCallback = recoveryCallback;
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
@@ -144,11 +167,18 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
}
if (endpoint instanceof AbstractKafkaListenerEndpoint) {
AbstractKafkaListenerEndpoint<K, V> aklEndpoint = (AbstractKafkaListenerEndpoint<K, V>) endpoint;
if (this.recordFilterStrategy != null) {
((AbstractKafkaListenerEndpoint<K, V>) endpoint).setRecordFilterStrategy(this.recordFilterStrategy);
aklEndpoint.setRecordFilterStrategy(this.recordFilterStrategy);
}
if (this.ackDiscarded != null) {
((AbstractKafkaListenerEndpoint<K, V>) endpoint).setAckDiscarded(this.ackDiscarded);
aklEndpoint.setAckDiscarded(this.ackDiscarded);
}
if (this.retryTemplate != null) {
aklEndpoint.setRetryTemplate(this.retryTemplate);
}
if (this.recoveryCallback != null) {
aklEndpoint.setRecoveryCallback(this.recoveryCallback);
}
}

View File

@@ -37,7 +37,11 @@ import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.adapter.FilteringAcknowledgingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
/**
@@ -75,6 +79,9 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
private boolean ackDiscarded;
private RetryTemplate retryTemplate;
private RecoveryCallback<Void> recoveryCallback;
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
@@ -218,14 +225,39 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
}
/**
* Set to true if the {@link #setRecordFilterStrategy(RecordFilterStrategy) recordFilterStrategy}
* is in use.
* Set to true if the {@link #setRecordFilterStrategy(RecordFilterStrategy)
* recordFilterStrategy} is in use.
* @param ackDiscarded the ackDiscarded.
*/
public void setAckDiscarded(boolean ackDiscarded) {
this.ackDiscarded = ackDiscarded;
}
protected RetryTemplate getRetryTemplate() {
return this.retryTemplate;
}
/**
* Set a retryTemplate.
* @param retryTemplate the template.
*/
public void setRetryTemplate(RetryTemplate retryTemplate) {
this.retryTemplate = retryTemplate;
}
protected RecoveryCallback<?> getRecoveryCallback() {
return this.recoveryCallback;
}
/**
* Set a callback to be used with the {@link #setRetryTemplate(RetryTemplate)
* retryTemplate}.
* @param recoveryCallback the callback.
*/
public void setRecoveryCallback(RecoveryCallback<Void> recoveryCallback) {
this.recoveryCallback = recoveryCallback;
}
@Override
public void setupListenerContainer(MessageListenerContainer listenerContainer, MessageConverter messageConverter) {
setupMessageListener(listenerContainer, messageConverter);
@@ -241,24 +273,33 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
protected abstract MessageListener<K, V> createMessageListener(MessageListenerContainer container,
MessageConverter messageConverter);
@SuppressWarnings("unchecked")
private void setupMessageListener(MessageListenerContainer container, MessageConverter messageConverter) {
MessageListener<K, V> messageListener = createMessageListener(container, messageConverter);
Object messageListener = createMessageListener(container, messageConverter);
Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener");
if (this.recordFilterStrategy != null) {
if (this.retryTemplate != null) {
if (messageListener instanceof AcknowledgingMessageListener) {
@SuppressWarnings("unchecked")
AcknowledgingMessageListener<K, V> aml = (AcknowledgingMessageListener<K, V>) messageListener;
aml = new FilteringAcknowledgingMessageListenerAdapter<>(this.recordFilterStrategy, aml, this.ackDiscarded);
container.setupMessageListener(aml);
messageListener = new RetryingAcknowledgingMessageListenerAdapter<>(
(AcknowledgingMessageListener<K, V>) messageListener, this.retryTemplate,
this.recoveryCallback);
}
else {
messageListener = new FilteringMessageListenerAdapter<>(this.recordFilterStrategy, messageListener);
container.setupMessageListener(messageListener);
messageListener = new RetryingMessageListenerAdapter<>((MessageListener<K, V>) messageListener,
this.retryTemplate, this.recoveryCallback);
}
}
else {
container.setupMessageListener(messageListener);
if (this.recordFilterStrategy != null) {
if (messageListener instanceof AcknowledgingMessageListener) {
messageListener = new FilteringAcknowledgingMessageListenerAdapter<>(
(AcknowledgingMessageListener<K, V>) messageListener, this.recordFilterStrategy,
this.ackDiscarded);
}
else {
messageListener = new FilteringMessageListenerAdapter<>((MessageListener<K, V>) messageListener,
this.recordFilterStrategy);
}
}
container.setupMessageListener(messageListener);
}
/**

View File

@@ -24,7 +24,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.BeanNameAware;
@@ -32,8 +31,6 @@ import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.util.Assert;
/**
@@ -187,28 +184,6 @@ public abstract class AbstractMessageListenerContainer<K, V>
|| this.containerProperties.getMessageListener() instanceof AcknowledgingMessageListener,
"Either a " + MessageListener.class.getName() + " or a "
+ AcknowledgingMessageListener.class.getName() + " must be provided");
if (this.containerProperties.getRecoveryCallback() == null) {
this.containerProperties.setRecoveryCallback(new RecoveryCallback<Void>() {
@Override
public Void recover(RetryContext context) throws Exception {
@SuppressWarnings("unchecked")
ConsumerRecord<K, V> record = (ConsumerRecord<K, V>) context.getAttribute("record");
Throwable lastThrowable = context.getLastThrowable();
if (AbstractMessageListenerContainer.this.containerProperties.getErrorHandler() != null
&& lastThrowable instanceof Exception) {
AbstractMessageListenerContainer.this.containerProperties.getErrorHandler()
.handle((Exception) lastThrowable, record);
}
else {
AbstractMessageListenerContainer.this.logger.error(
"Listener threw an exception and no error handler for " + record, lastThrowable);
}
return null;
}
});
}
doStart();
}
}

View File

@@ -51,8 +51,6 @@ import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.event.ListenerContainerIdleEvent;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
@@ -561,41 +559,10 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
}
try {
if (this.acknowledgingMessageListener != null) {
if (this.containerProperties.getRetryTemplate() != null) {
this.containerProperties.getRetryTemplate().execute(
new RetryCallback<Void, KafkaException>() {
@Override
public Void doWithRetry(RetryContext context) throws KafkaException {
context.setAttribute("record", record);
ListenerConsumer.this.acknowledgingMessageListener.onMessage(record,
new ConsumerAcknowledgment(record));
return null;
}
}, this.containerProperties.getRecoveryCallback());
}
else {
this.acknowledgingMessageListener.onMessage(record, new ConsumerAcknowledgment(record));
}
this.acknowledgingMessageListener.onMessage(record, new ConsumerAcknowledgment(record));
}
else {
if (this.containerProperties.getRetryTemplate() != null) {
this.containerProperties.getRetryTemplate().execute(
new RetryCallback<Void, KafkaException>() {
@Override
public Void doWithRetry(RetryContext context) throws KafkaException {
context.setAttribute("record", record);
ListenerConsumer.this.listener.onMessage(record);
return null;
}
}, this.containerProperties.getRecoveryCallback());
}
else {
this.listener.onMessage(record);
}
this.listener.onMessage(record);
}
this.acks.add(record);
if (this.isManualImmediateAck || this.isRecordAck) {

View File

@@ -0,0 +1,73 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.listener.adapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
/**
* Base class for retrying message listener adapters.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
*
*/
public class AbstractRetryingMessageListenerAdapter<K, V> {
protected final Log logger = LogFactory.getLog(this.getClass());
private final RetryTemplate retryTemplate;
private final RecoveryCallback<Void> recoveryCallback;
/**
* Construct an instance with the supplied retry template. The exception will be
* thrown to the container after retries are exhausted.
* @param retryTemplate the template.
*/
public AbstractRetryingMessageListenerAdapter(RetryTemplate retryTemplate) {
this(retryTemplate, null);
}
/**
* Construct an instance with the supplied template and callback.
* @param retryTemplate the template.
* @param recoveryCallback the recovery callback; if null, the exception will be
* thrown to the container after retries are exhausted.
*/
public AbstractRetryingMessageListenerAdapter(RetryTemplate retryTemplate,
RecoveryCallback<Void> recoveryCallback) {
Assert.notNull(retryTemplate, "'retryTemplate' cannot be null");
this.retryTemplate = retryTemplate;
this.recoveryCallback = recoveryCallback;
}
public RetryTemplate getRetryTemplate() {
return this.retryTemplate;
}
public RecoveryCallback<Void> getRecoveryCallback() {
return this.recoveryCallback;
}
}

View File

@@ -40,12 +40,12 @@ public class FilteringAcknowledgingMessageListenerAdapter<K, V> extends Abstract
/**
* Create an instance with the supplied strategy and delegate listener.
* @param recordFilterStrategy the filter.
* @param delegate the delegate.
* @param recordFilterStrategy the filter.
* @param ackDiscarded true to ack (commit offset for) discarded messages.
*/
public FilteringAcknowledgingMessageListenerAdapter(RecordFilterStrategy<K, V> recordFilterStrategy,
AcknowledgingMessageListener<K, V> delegate, boolean ackDiscarded) {
public FilteringAcknowledgingMessageListenerAdapter(AcknowledgingMessageListener<K, V> delegate,
RecordFilterStrategy<K, V> recordFilterStrategy, boolean ackDiscarded) {
super(recordFilterStrategy);
this.delegate = delegate;
this.ackDiscarded = ackDiscarded;

View File

@@ -37,11 +37,11 @@ public class FilteringMessageListenerAdapter<K, V> extends AbstractFilteringMess
/**
* Create an instance with the supplied strategy and delegate listener.
* @param recordFilterStrategy the filter.
* @param delegate the delegate.
* @param recordFilterStrategy the filter.
*/
public FilteringMessageListenerAdapter(RecordFilterStrategy<K, V> recordFilterStrategy,
MessageListener<K, V> delegate) {
public FilteringMessageListenerAdapter(MessageListener<K, V> delegate,
RecordFilterStrategy<K, V> recordFilterStrategy) {
super(recordFilterStrategy);
this.delegate = delegate;
}

View File

@@ -0,0 +1,83 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.listener.adapter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
/**
* A retrying message listener adapter for {@link MessageListener}s.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
*/
public class RetryingAcknowledgingMessageListenerAdapter<K, V> extends AbstractRetryingMessageListenerAdapter<K, V>
implements AcknowledgingMessageListener<K, V> {
private final AcknowledgingMessageListener<K, V> delegate;
/**
* Construct an instance with the provided template and delegate. The exception will
* be thrown to the container after retries are exhausted.
* @param messageListener the listener delegate.
* @param retryTemplate the template.
*/
public RetryingAcknowledgingMessageListenerAdapter(AcknowledgingMessageListener<K, V> messageListener,
RetryTemplate retryTemplate) {
this(messageListener, retryTemplate, null);
}
/**
* Construct an instance with the provided template, callback and delegate.
* @param messageListener the listener delegate.
* @param retryTemplate the template.
* @param recoveryCallback the recovery callback; if null, the exception will be
* thrown to the container after retries are exhausted.
*/
public RetryingAcknowledgingMessageListenerAdapter(AcknowledgingMessageListener<K, V> messageListener,
RetryTemplate retryTemplate, RecoveryCallback<Void> recoveryCallback) {
super(retryTemplate, recoveryCallback);
Assert.notNull(messageListener, "'messageListener' cannot be null");
this.delegate = messageListener;
}
@Override
public void onMessage(final ConsumerRecord<K, V> record, final Acknowledgment acknowledgment) {
getRetryTemplate().execute(new RetryCallback<Void, KafkaException>() {
@Override
public Void doWithRetry(RetryContext context) throws KafkaException {
context.setAttribute("record", record);
RetryingAcknowledgingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment);
return null;
}
}, getRecoveryCallback());
}
}

View File

@@ -0,0 +1,81 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.listener.adapter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
/**
* A retrying message listener adapter for {@link MessageListener}s.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
*
*/
public class RetryingMessageListenerAdapter<K, V> extends AbstractRetryingMessageListenerAdapter<K, V>
implements MessageListener<K, V> {
private final MessageListener<K, V> delegate;
/**
* Construct an instance with the provided template and delegate. The exception will
* be thrown to the container after retries are exhausted.
* @param messageListener the delegate listener.
* @param retryTemplate the template.
*/
public RetryingMessageListenerAdapter(MessageListener<K, V> messageListener, RetryTemplate retryTemplate) {
this(messageListener, retryTemplate, null);
}
/**
* Construct an instance with the provided template, callback and delegate.
* @param messageListener the delegate listener.
* @param retryTemplate the template.
* @param recoveryCallback the recovery callback; if null, the exception will be
* thrown to the container after retries are exhausted.
*/
public RetryingMessageListenerAdapter(MessageListener<K, V> messageListener, RetryTemplate retryTemplate,
RecoveryCallback<Void> recoveryCallback) {
super(retryTemplate, recoveryCallback);
Assert.notNull(messageListener, "'messageListener' cannot be null");
this.delegate = messageListener;
}
@Override
public void onMessage(final ConsumerRecord<K, V> record) {
getRetryTemplate().execute(new RetryCallback<Void, KafkaException>() {
@Override
public Void doWithRetry(RetryContext context) throws KafkaException {
context.setAttribute("record", record);
RetryingMessageListenerAdapter.this.delegate.onMessage(record);
return null;
}
}, getRecoveryCallback());
}
}

View File

@@ -31,8 +31,6 @@ import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.LoggingErrorHandler;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
/**
@@ -131,17 +129,6 @@ public class ContainerProperties {
*/
private boolean pauseEnabled = true;
/**
* A retry template to retry deliveries.
*/
private RetryTemplate retryTemplate;
/**
* A recovery callback to be invoked when retries are exhausted. By default
* the error handler is invoked.
*/
private RecoveryCallback<Void> recoveryCallback;
/**
* Set the queue depth for handoffs from the consumer thread to the listener
* thread. Default 1 (up to 2 in process).
@@ -304,23 +291,6 @@ public class ContainerProperties {
this.pauseEnabled = pauseEnabled;
}
/**
* Set a retry template to retry deliveries.
* @param retryTemplate the retry template.
*/
public void setRetryTemplate(RetryTemplate retryTemplate) {
this.retryTemplate = retryTemplate;
}
/**
* Set a recovery callback to be invoked when retries are exhausted. By default
* the error handler is invoked.
* @param recoveryCallback the recovery callback.
*/
public void setRecoveryCallback(RecoveryCallback<Void> recoveryCallback) {
this.recoveryCallback = recoveryCallback;
}
/**
* Set the queue depth for handoffs from the consumer thread to the listener
* thread. Default 1 (up to 2 in process).
@@ -454,14 +424,6 @@ public class ContainerProperties {
return this.pauseEnabled;
}
public RetryTemplate getRetryTemplate() {
return this.retryTemplate;
}
public RecoveryCallback<Void> getRecoveryCallback() {
return this.recoveryCallback;
}
public int getQueueDepth() {
return this.queueDepth;
}

View File

@@ -51,7 +51,9 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMo
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.adapter.FilteringAcknowledgingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
@@ -61,6 +63,9 @@ import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@@ -134,6 +139,14 @@ public class EnableKafkaIntegrationTests {
.isInstanceOf(FilteringAcknowledgingMessageListenerAdapter.class);
assertThat(KafkaTestUtils.getPropertyValue(manualContainer, "containerProperties.messageListener.ackDiscarded",
Boolean.class)).isTrue();
assertThat(KafkaTestUtils.getPropertyValue(manualContainer, "containerProperties.messageListener.delegate"))
.isInstanceOf(RetryingAcknowledgingMessageListenerAdapter.class);
assertThat(KafkaTestUtils
.getPropertyValue(manualContainer, "containerProperties.messageListener.delegate.recoveryCallback")
.getClass().getName()).contains("EnableKafkaIntegrationTests$Config$");
assertThat(KafkaTestUtils.getPropertyValue(manualContainer,
"containerProperties.messageListener.delegate.delegate"))
.isInstanceOf(MessagingMessageListenerAdapter.class);
template.send("annotated5", 0, 0, "foo");
template.send("annotated5", 1, 0, "bar");
@@ -254,6 +267,15 @@ public class EnableKafkaIntegrationTests {
props.setIdleEventInterval(100L);
factory.setRecordFilterStrategy(manualFilter());
factory.setAckDiscarded(true);
factory.setRetryTemplate(new RetryTemplate());
factory.setRecoveryCallback(new RecoveryCallback<Void>() {
@Override
public Void recover(RetryContext context) throws Exception {
return null;
}
});
return factory;
}

View File

@@ -21,28 +21,25 @@ import static org.mockito.BDDMockito.given;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.junit.ClassRule;
import org.junit.Ignore;
@@ -58,7 +55,6 @@ import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -99,26 +95,23 @@ public class ConcurrentMessageListenerContainerTests {
public void testAutoCommit() throws Exception {
this.logger.info("Start auto");
Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "true", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic1);
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(4);
final List<String> listenerThreadNames = new ArrayList<>();
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
listenerThreadNames.add(Thread.currentThread().getName());
latch.countDown();
}
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
listenerThreadNames.add(Thread.currentThread().getName());
latch.countDown();
});
container.setConcurrency(2);
container.setBeanName("testAuto");
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic1);
template.sendDefault(0, "foo");
@@ -136,20 +129,16 @@ public class ConcurrentMessageListenerContainerTests {
public void testAutoCommitWithRebalanceListener() throws Exception {
this.logger.info("Start auto");
Map<String, Object> props = KafkaTestUtils.consumerProps("test10", "true", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic1);
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(4);
final List<String> listenerThreadNames = new ArrayList<>();
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
listenerThreadNames.add(Thread.currentThread().getName());
latch.countDown();
}
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
listenerThreadNames.add(Thread.currentThread().getName());
latch.countDown();
});
final CountDownLatch rebalancePartitionsAssignedLatch = new CountDownLatch(2);
final CountDownLatch rebalancePartitionsRevokedLatch = new CountDownLatch(2);
@@ -174,7 +163,7 @@ public class ConcurrentMessageListenerContainerTests {
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic1);
template.sendDefault(0, "foo");
@@ -194,27 +183,23 @@ public class ConcurrentMessageListenerContainerTests {
public void testAfterListenCommit() throws Exception {
this.logger.info("Start manual");
Map<String, Object> props = KafkaTestUtils.consumerProps("test2", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic2);
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(4);
final ArrayList<String> listenerThreadNames = new ArrayList<>();
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
ConcurrentMessageListenerContainerTests.this.logger.info("manual: " + message);
listenerThreadNames.add(Thread.currentThread().getName());
latch.countDown();
}
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
ConcurrentMessageListenerContainerTests.this.logger.info("manual: " + message);
listenerThreadNames.add(Thread.currentThread().getName());
latch.countDown();
});
container.setConcurrency(2);
container.setBeanName("testBatch");
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic2);
template.sendDefault(0, "foo");
@@ -223,13 +208,7 @@ public class ConcurrentMessageListenerContainerTests {
template.sendDefault(2, "qux");
template.flush();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(listenerThreadNames).allMatch(new Predicate<String>() {
@Override
public boolean test(String threadName) {
return threadName.contains("-listener-");
}
});
assertThat(listenerThreadNames).allMatch(threadName -> threadName.contains("-listener-"));
container.stop();
this.logger.info("Stop manual");
}
@@ -267,14 +246,9 @@ public class ConcurrentMessageListenerContainerTests {
ConcurrentMessageListenerContainer<Integer, String> container1 =
new ConcurrentMessageListenerContainer<>(cf, container1Props);
final CountDownLatch latch1 = new CountDownLatch(2);
container1Props.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
ConcurrentMessageListenerContainerTests.this.logger.info("auto part: " + message);
latch1.countDown();
}
container1Props.setMessageListener((MessageListener<Integer, String>) message -> {
ConcurrentMessageListenerContainerTests.this.logger.info("auto part: " + message);
latch1.countDown();
});
container1.setBeanName("b1");
container1.start();
@@ -284,14 +258,9 @@ public class ConcurrentMessageListenerContainerTests {
ConcurrentMessageListenerContainer<Integer, String> container2 =
new ConcurrentMessageListenerContainer<>(cf, container2Props);
final CountDownLatch latch2 = new CountDownLatch(2);
container2Props.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
ConcurrentMessageListenerContainerTests.this.logger.info("auto part: " + message);
latch2.countDown();
}
container2Props.setMessageListener((MessageListener<Integer, String>) message -> {
ConcurrentMessageListenerContainerTests.this.logger.info("auto part: " + message);
latch2.countDown();
});
container2.setBeanName("b2");
container2.start();
@@ -299,7 +268,7 @@ public class ConcurrentMessageListenerContainerTests {
assertThat(initialConsumersLatch.await(20, TimeUnit.SECONDS)).isTrue();
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic3);
template.sendDefault(0, 0, "foo");
@@ -321,13 +290,9 @@ public class ConcurrentMessageListenerContainerTests {
new ConcurrentMessageListenerContainer<>(cf, container3Props);
resettingContainer.setBeanName("b3");
final CountDownLatch latch3 = new CountDownLatch(4);
container3Props.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
ConcurrentMessageListenerContainerTests.this.logger.info("auto part e: " + message);
latch3.countDown();
}
container3Props.setMessageListener((MessageListener<Integer, String>) message -> {
ConcurrentMessageListenerContainerTests.this.logger.info("auto part e: " + message);
latch3.countDown();
});
resettingContainer.start();
assertThat(latch3.await(60, TimeUnit.SECONDS)).isTrue();
@@ -336,21 +301,17 @@ public class ConcurrentMessageListenerContainerTests {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
cf = new DefaultKafkaConsumerFactory<>(props);
// reset minusone
// reset minus one
ContainerProperties container4Props = new ContainerProperties(topic1Partition0, topic1Partition1);
resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container4Props);
resettingContainer.setBeanName("b4");
container4Props.setRecentOffset(1);
final CountDownLatch latch4 = new CountDownLatch(2);
final AtomicReference<String> receivedMessage = new AtomicReference<>();
container4Props.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
ConcurrentMessageListenerContainerTests.this.logger.info("auto part -1: " + message);
receivedMessage.set(message.value());
latch4.countDown();
}
container4Props.setMessageListener((MessageListener<Integer, String>) message -> {
ConcurrentMessageListenerContainerTests.this.logger.info("auto part -1: " + message);
receivedMessage.set(message.value());
latch4.countDown();
});
resettingContainer.start();
assertThat(latch4.await(60, TimeUnit.SECONDS)).isTrue();
@@ -373,20 +334,15 @@ public class ConcurrentMessageListenerContainerTests {
private void testManualCommitGuts(AckMode ackMode, String topic) throws Exception {
this.logger.info("Start " + ackMode);
Map<String, Object> props = KafkaTestUtils.consumerProps("test" + ackMode, "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic);
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(4);
containerProps.setMessageListener(new AcknowledgingMessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message, Acknowledgment ack) {
ConcurrentMessageListenerContainerTests.this.logger.info("manual: " + message);
ack.acknowledge();
latch.countDown();
}
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
ConcurrentMessageListenerContainerTests.this.logger.info("manual: " + message);
ack.acknowledge();
latch.countDown();
});
container.setConcurrency(2);
containerProps.setAckMode(ackMode);
@@ -394,7 +350,7 @@ public class ConcurrentMessageListenerContainerTests {
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic);
template.sendDefault(0, "foo");
@@ -412,7 +368,7 @@ public class ConcurrentMessageListenerContainerTests {
public void testManualCommitExisting() throws Exception {
this.logger.info("Start MANUAL_IMMEDIATE with Existing");
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic7);
template.sendDefault(0, "foo");
@@ -422,36 +378,26 @@ public class ConcurrentMessageListenerContainerTests {
template.flush();
Map<String, Object> props = KafkaTestUtils.consumerProps("testManualExisting", "false", embeddedKafka);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic7);
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(8);
containerProps.setMessageListener(new AcknowledgingMessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message, Acknowledgment ack) {
ConcurrentMessageListenerContainerTests.this.logger.info("manualExisting: " + message);
ack.acknowledge();
latch.countDown();
}
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
ConcurrentMessageListenerContainerTests.this.logger.info("manualExisting: " + message);
ack.acknowledge();
latch.countDown();
});
container.setConcurrency(1);
containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE);
container.setBeanName("testManualExisting");
final CountDownLatch commits = new CountDownLatch(8);
final AtomicReference<Exception> exceptionRef = new AtomicReference<Exception>();
containerProps.setCommitCallback(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
commits.countDown();
if (exception != null) {
exceptionRef.compareAndSet(null, exception);
}
final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
containerProps.setCommitCallback((offsets, exception) -> {
commits.countDown();
if (exception != null) {
exceptionRef.compareAndSet(null, exception);
}
});
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
@@ -487,16 +433,11 @@ public class ConcurrentMessageListenerContainerTests {
new ConcurrentMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(8);
final BitSet bitSet = new BitSet(8);
containerProps.setMessageListener(new AcknowledgingMessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message, Acknowledgment ack) {
ConcurrentMessageListenerContainerTests.this.logger.info("manualExisting: " + message);
ack.acknowledge();
bitSet.set((int) (message.partition() * 4 + message.offset()));
latch.countDown();
}
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
ConcurrentMessageListenerContainerTests.this.logger.info("manualExisting: " + message);
ack.acknowledge();
bitSet.set((int) (message.partition() * 4 + message.offset()));
latch.countDown();
});
container.setConcurrency(1);
containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE_SYNC);
@@ -542,13 +483,7 @@ public class ConcurrentMessageListenerContainerTests {
ContainerProperties containerProps = new ContainerProperties(topic1PartitionS);
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
}
});
containerProps.setMessageListener((MessageListener<Integer, String>) message -> { });
container.setConcurrency(3);
container.start();
@SuppressWarnings("unchecked")
@@ -566,7 +501,7 @@ public class ConcurrentMessageListenerContainerTests {
public void testListenerException() throws Exception {
this.logger.info("Start exception");
Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "true", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic6);
containerProps.setAckCount(23);
ContainerProperties containerProps2 = new ContainerProperties(topic2);
@@ -575,29 +510,18 @@ public class ConcurrentMessageListenerContainerTests {
new ConcurrentMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(4);
final AtomicBoolean catchError = new AtomicBoolean(false);
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
latch.countDown();
throw new RuntimeException("intended");
}
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
latch.countDown();
throw new RuntimeException("intended");
});
container.setConcurrency(2);
container.setBeanName("testException");
containerProps.setErrorHandler(new ErrorHandler() {
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
catchError.set(true);
}
});
containerProps.setErrorHandler((thrownException, record) -> catchError.set(true));
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic6);
template.sendDefault(0, "foo");
@@ -617,18 +541,14 @@ public class ConcurrentMessageListenerContainerTests {
public void testAckOnErrorRecord() throws Exception {
logger.info("Start ack on error");
Map<String, Object> props = KafkaTestUtils.consumerProps("test9", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
final CountDownLatch latch = new CountDownLatch(4);
ContainerProperties containerProps = new ContainerProperties(topic9);
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
logger.info("auto ack on error: " + message);
latch.countDown();
if (message.value().startsWith("b")) {
throw new RuntimeException();
}
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
logger.info("auto ack on error: " + message);
latch.countDown();
if (message.value().startsWith("b")) {
throw new RuntimeException();
}
});
containerProps.setSyncCommits(true);
@@ -641,7 +561,7 @@ public class ConcurrentMessageListenerContainerTests {
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic9);
template.sendDefault(0, 0, "foo");

View File

@@ -46,6 +46,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.test.rule.KafkaEmbedded;
@@ -202,27 +203,28 @@ public class KafkaMessageListenerContainerTests {
final CountDownLatch latch = new CountDownLatch(18);
final BitSet bitSet = new BitSet(6);
final Map<String, AtomicInteger> faults = new HashMap<>();
containerProps.setMessageListener(new MessageListener<Integer, String>() {
RetryingMessageListenerAdapter<Integer, String> adapter = new RetryingMessageListenerAdapter<>(
new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
logger.info("slow3: " + message);
bitSet.set((int) (message.partition() * 3 + message.offset()));
String key = message.topic() + message.partition() + message.offset();
if (faults.get(key) == null) {
faults.put(key, new AtomicInteger(1));
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
logger.info("slow3: " + message);
bitSet.set((int) (message.partition() * 3 + message.offset()));
String key = message.topic() + message.partition() + message.offset();
if (faults.get(key) == null) {
faults.put(key, new AtomicInteger(1));
}
else {
faults.get(key).incrementAndGet();
}
latch.countDown(); // 3 per = 18
if (faults.get(key).get() < 3) { // succeed on the third attempt
throw new FooEx();
}
}
else {
faults.get(key).incrementAndGet();
}
latch.countDown(); // 3 per = 18
if (faults.get(key).get() < 3) { // succeed on the third attempt
throw new FooEx();
}
}
});
addRetry(containerProps);
}, buildRetry(), null);
containerProps.setMessageListener(adapter);
containerProps.setPauseAfter(100);
container.setBeanName("testSlow3");
container.start();
@@ -260,35 +262,36 @@ public class KafkaMessageListenerContainerTests {
final CountDownLatch latch = new CountDownLatch(18);
final BitSet bitSet = new BitSet(6);
final Map<String, AtomicInteger> faults = new HashMap<>();
containerProps.setMessageListener(new MessageListener<Integer, String>() {
RetryingMessageListenerAdapter<Integer, String> adapter = new RetryingMessageListenerAdapter<>(
new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
logger.info("slow4: " + message);
bitSet.set((int) (message.partition() * 4 + message.offset()));
String key = message.topic() + message.partition() + message.offset();
if (faults.get(key) == null) {
faults.put(key, new AtomicInteger(1));
}
else {
faults.get(key).incrementAndGet();
}
latch.countDown(); // 3 per = 18
if (faults.get(key).get() == 1) {
try {
Thread.sleep(1000);
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
logger.info("slow4: " + message);
bitSet.set((int) (message.partition() * 4 + message.offset()));
String key = message.topic() + message.partition() + message.offset();
if (faults.get(key) == null) {
faults.put(key, new AtomicInteger(1));
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
else {
faults.get(key).incrementAndGet();
}
latch.countDown(); // 3 per = 18
if (faults.get(key).get() == 1) {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (faults.get(key).get() < 3) { // succeed on the third attempt
throw new FooEx();
}
}
if (faults.get(key).get() < 3) { // succeed on the third attempt
throw new FooEx();
}
}
});
addRetry(containerProps);
}, buildRetry(), null);
containerProps.setMessageListener(adapter);
containerProps.setPauseAfter(100);
container.setBeanName("testSlow4");
container.start();
@@ -315,14 +318,14 @@ public class KafkaMessageListenerContainerTests {
logger.info("Stop " + this.testName.getMethodName());
}
private void addRetry(ContainerProperties containerProps) {
private RetryTemplate buildRetry() {
SimpleRetryPolicy policy = new SimpleRetryPolicy(3, Collections.singletonMap(FooEx.class, true));
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(policy);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
retryTemplate.setBackOffPolicy(backOffPolicy);
containerProps.setRetryTemplate(retryTemplate);
return retryTemplate;
}
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {

View File

@@ -344,11 +344,24 @@ This class takes an implementation of `RecordFilterStrategy` where you implement
that a message is a duplicate and should be discarded.
A `FilteringAcknowledgingMessageListenerAdapter` is also provided for wrapping an `AcknowledgingMessageListener`.
This has an additional property `ackDiscarded` which indicates whether the adapter should acknowledge the discarded record.
This has an additional property `ackDiscarded` which indicates whether the adapter should acknowledge the discarded record; it is `true` by default.
When using `@KafkaListener`, set the `RecordFilterStrategy` (and optionally `ackDiscarded`) on the container factory and the listener will be wrapped in the appropriate filtering adapter.
===== Retrying Deliveries
If your listener throws an exception, the default behavior is to invoke the `ErrorHandler`, if configured, or logged otherwise.
To retry deliveries, convenient listener adapters - `RetryingMessageListenerAdapter` and `RetryingAcknowledgingMessageListenerAdapter` are provided, depending on whether you are using a `MessageListener` or an `AcknowledgingMessageListener`.
These can be configured with a `RetryTemplate` and `RecoveryCallback<Void>` - see the https://github.com/spring-projects/spring-retry[spring-retry]
project for information about these components.
If a recovery callback is not provided, the exception is thrown to the container after retries are exhausted.
In that case, the `ErrorHandler` will be invoked, if configured, or logged otherwise.
When using `@KafkaListener`, set the `RetryTemplate` (and optionally `recoveryCallback`) on the container factory and the listener will be wrapped in the appropriate retrying adapter.
==== Serialization/Deserialization and Message Conversion
Apache Kafka provides a high-level API for serializing/deserializing record values as well as their keys.