diff --git a/.travis.yml b/.travis.yml
index c0f28cfa..a176eac9 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,2 +1,11 @@
language: java
jdk: oraclejdk8
+install: true
+before_cache:
+ - rm -f $HOME/.gradle/caches/modules-2/modules-2.lock
+cache:
+ directories:
+ - $HOME/.gradle/caches/
+ - $HOME/.gradle/wrapper/
+script:
+ - ./gradlew check --no-daemon
diff --git a/gradle.properties b/gradle.properties
index bebfcbcf..2b2996bc 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1 +1,2 @@
version=1.0.0.BUILD-SNAPSHOT
+org.gradle.daemon=true
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
index 2c6137b8..ca78035e 100644
Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index cd2de496..323f0727 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
-#Wed Mar 30 18:31:11 EDT 2016
+#Mon May 16 18:24:00 EDT 2016
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-2.12-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-2.13-bin.zip
diff --git a/gradlew b/gradlew
index 9d82f789..27309d92 100755
--- a/gradlew
+++ b/gradlew
@@ -6,12 +6,30 @@
##
##############################################################################
-# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
-DEFAULT_JVM_OPTS=""
+# Attempt to set APP_HOME
+# Resolve links: $0 may be a link
+PRG="$0"
+# Need this for relative symlinks.
+while [ -h "$PRG" ] ; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG=`dirname "$PRG"`"/$link"
+ fi
+done
+SAVED="`pwd`"
+cd "`dirname \"$PRG\"`/" >/dev/null
+APP_HOME="`pwd -P`"
+cd "$SAVED" >/dev/null
APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`
+# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+DEFAULT_JVM_OPTS=""
+
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
@@ -30,6 +48,7 @@ die ( ) {
cygwin=false
msys=false
darwin=false
+nonstop=false
case "`uname`" in
CYGWIN* )
cygwin=true
@@ -40,26 +59,11 @@ case "`uname`" in
MINGW* )
msys=true
;;
+ NONSTOP* )
+ nonstop=true
+ ;;
esac
-# Attempt to set APP_HOME
-# Resolve links: $0 may be a link
-PRG="$0"
-# Need this for relative symlinks.
-while [ -h "$PRG" ] ; do
- ls=`ls -ld "$PRG"`
- link=`expr "$ls" : '.*-> \(.*\)$'`
- if expr "$link" : '/.*' > /dev/null; then
- PRG="$link"
- else
- PRG=`dirname "$PRG"`"/$link"
- fi
-done
-SAVED="`pwd`"
-cd "`dirname \"$PRG\"`/" >/dev/null
-APP_HOME="`pwd -P`"
-cd "$SAVED" >/dev/null
-
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM.
@@ -85,7 +89,7 @@ location of your Java installation."
fi
# Increase the maximum file descriptors if we can.
-if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then
+if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
MAX_FD_LIMIT=`ulimit -H -n`
if [ $? -eq 0 ] ; then
if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
diff --git a/gradlew.bat b/gradlew.bat
index 72d362da..f6d5974e 100644
--- a/gradlew.bat
+++ b/gradlew.bat
@@ -8,14 +8,14 @@
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal
-@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
-set DEFAULT_JVM_OPTS=
-
set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
+@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+set DEFAULT_JVM_OPTS=
+
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java
index 7643d4ea..9a2403d6 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java
@@ -24,53 +24,57 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.kafka.listener.MessageListener;
-import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
import org.springframework.messaging.handler.annotation.MessageMapping;
/**
- * Annotation that marks a method to be the target of a Kafka message
- * listener on the specified topics.
+ * Annotation that marks a method to be the target of a Kafka message listener on the
+ * specified topics.
*
- * The {@link #containerFactory()}
- * identifies the {@link org.springframework.kafka.config.KafkaListenerContainerFactory
+ * The {@link #containerFactory()} identifies the
+ * {@link org.springframework.kafka.config.KafkaListenerContainerFactory
* KafkaListenerContainerFactory} to use to build the Kafka listener container. If not
- * set, a default container factory is assumed to be available with a bean
- * name of {@code kafkaListenerContainerFactory} unless an explicit default has been
- * provided through configuration.
+ * set, a default container factory is assumed to be available with a bean name
+ * of {@code kafkaListenerContainerFactory} unless an explicit default has been provided
+ * through configuration.
*
- *
Processing of {@code @KafkaListener} annotations is performed by
- * registering a {@link KafkaListenerAnnotationBeanPostProcessor}. This can be
- * done manually or, more conveniently, through {@link EnableKafka} annotation.
+ *
+ * Processing of {@code @KafkaListener} annotations is performed by registering a
+ * {@link KafkaListenerAnnotationBeanPostProcessor}. This can be done manually or, more
+ * conveniently, through {@link EnableKafka} annotation.
*
- *
Annotated methods are allowed to have flexible signatures similar to what
+ *
+ * Annotated methods are allowed to have flexible signatures similar to what
* {@link MessageMapping} provides, that is
*
- * - {@link org.apache.kafka.clients.consumer.ConsumerRecord} to
- * access to the raw Kafka message
+ * - {@link org.apache.kafka.clients.consumer.ConsumerRecord} to access to the raw Kafka
+ * message
* - {@link org.springframework.kafka.support.Acknowledgment} to manually ack
- * - {@link org.springframework.messaging.handler.annotation.Payload @Payload}-annotated method
- * arguments including the support of validation
- * - {@link org.springframework.messaging.handler.annotation.Header @Header}-annotated method
- * arguments to extract a specific header value, defined by
+ *
- {@link org.springframework.messaging.handler.annotation.Payload @Payload}-annotated
+ * method arguments including the support of validation
+ * - {@link org.springframework.messaging.handler.annotation.Header @Header}-annotated
+ * method arguments to extract a specific header value, defined by
* {@link org.springframework.kafka.support.KafkaHeaders KafkaHeaders}
* - {@link org.springframework.messaging.handler.annotation.Headers @Headers}-annotated
- * argument that must also be assignable to {@link java.util.Map} for getting access to all
- * headers.
+ * argument that must also be assignable to {@link java.util.Map} for getting access to
+ * all headers.
* - {@link org.springframework.messaging.MessageHeaders MessageHeaders} arguments for
* getting access to all headers.
- * - {@link org.springframework.messaging.support.MessageHeaderAccessor MessageHeaderAccessor}
- * for convenient access to all method arguments.
+ * - {@link org.springframework.messaging.support.MessageHeaderAccessor
+ * MessageHeaderAccessor} for convenient access to all method arguments.
*
*
- * When defined at the method level, a listener container is created for each method. The
- * {@link MessageListener} is a {@link MessagingMessageListenerAdapter}, configured with a
- * {@link org.springframework.kafka.config.MethodKafkaListenerEndpoint}.
+ *
When defined at the method level, a listener container is created for each method.
+ * The {@link MessageListener} is a
+ * {@link org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter},
+ * configured with a {@link org.springframework.kafka.config.MethodKafkaListenerEndpoint}.
*
- *
When defined at the class level, a single message listener container is used to service
- * all methods annotated with {@code @KafkaHandler}. Method signatures of such annotated
- * methods must not cause any ambiguity such that a single method can be resolved for a
- * particular inbound message. The {@link MessagingMessageListenerAdapter} is configured with
- * a {@link org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint}.
+ *
When defined at the class level, a single message listener container is used to
+ * service all methods annotated with {@code @KafkaHandler}. Method signatures of such
+ * annotated methods must not cause any ambiguity such that a single method can be
+ * resolved for a particular inbound message. The
+ * {@link org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter} is
+ * configured with a
+ * {@link org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint}.
*
* @author Gary Russell
*
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java
index fda96fe2..a1ba9427 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java
@@ -23,6 +23,7 @@ import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.ErrorHandler;
+import org.springframework.kafka.listener.adapter.DeDuplicationStrategy;
import org.springframework.kafka.support.converter.MessageConverter;
/**
@@ -57,6 +58,8 @@ public abstract class AbstractKafkaListenerContainerFactory deDuplicationStrategy;
+
/**
* Specify a {@link ConsumerFactory} to use.
* @param consumerFactory The consumer factory.
@@ -140,6 +143,11 @@ public abstract class AbstractKafkaListenerContainerFactory deDuplicationStrategy) {
+ this.deDuplicationStrategy = deDuplicationStrategy;
+ }
+
+ @SuppressWarnings("unchecked")
@Override
public C createListenerContainer(KafkaListenerEndpoint endpoint) {
C instance = createContainerInstance(endpoint);
@@ -169,6 +177,9 @@ public abstract class AbstractKafkaListenerContainerFactory) endpoint).setDeDuplicationStrategy(this.deDuplicationStrategy);
+ }
endpoint.setupListenerContainer(instance, this.messageConverter);
initializeContainer(instance);
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java
index 160c6085..c2fdadaf 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java
@@ -33,6 +33,7 @@ import org.springframework.beans.factory.config.BeanExpressionResolver;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
+import org.springframework.kafka.listener.adapter.DeDuplicationStrategy;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.util.Assert;
@@ -67,6 +68,8 @@ public abstract class AbstractKafkaListenerEndpoint
private String group;
+ private DeDuplicationStrategy deDuplicationStrategy;
+
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
@@ -193,6 +196,18 @@ public abstract class AbstractKafkaListenerEndpoint
}
}
+ protected DeDuplicationStrategy getDeDuplicationStrategy() {
+ return this.deDuplicationStrategy;
+ }
+
+ /**
+ * Set a {@link DeDuplicationStrategy} implementation.
+ * @param deDuplicationStrategy the strategy implementation.
+ */
+ public void setDeDuplicationStrategy(DeDuplicationStrategy deDuplicationStrategy) {
+ this.deDuplicationStrategy = deDuplicationStrategy;
+ }
+
@Override
public void setupListenerContainer(MessageListenerContainer listenerContainer, MessageConverter messageConverter) {
setupMessageListener(listenerContainer, messageConverter);
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java
index 1decc744..6afa71e0 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java
@@ -94,6 +94,9 @@ public class MethodKafkaListenerEndpoint extends AbstractKafkaListenerEndp
Assert.state(this.messageHandlerMethodFactory != null,
"Could not create message listener - MessageHandlerMethodFactory not set");
MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
+ if (getDeDuplicationStrategy() != null) {
+ messageListener.setDeDuplicationStrategy(getDeDuplicationStrategy());
+ }
messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
if (messageConverter != null) {
messageListener.setMessageConverter(messageConverter);
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
index be98ba15..d10afd2c 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
@@ -458,8 +458,8 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener
ackImmediate(record);
}
else {
- throw new IllegalStateException("AckMode must be MANUAL or MANUAL_IMMEDIATE "
- + "for manual acks");
+ throw new IllegalStateException("AckMode must be MANUAL, "
+ + "MANUAL_IMMEDIATE, or MANUAL_IMMEDIATE_SYNC for manual acks");
}
}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractDeDuplicatingMessageListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractDeDuplicatingMessageListener.java
new file mode 100644
index 00000000..79fb0f3c
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractDeDuplicatingMessageListener.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.listener.adapter;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import org.springframework.util.Assert;
+
+/**
+ * An abstract message listener adapter that implements de-duplication logic
+ * via a {@link DeDuplicationStrategy}.
+ *
+ * @param the key type.
+ * @param the value type.
+ *
+ * @author Gary Russell
+ *
+ */
+public abstract class AbstractDeDuplicatingMessageListener {
+
+ private final DeDuplicationStrategy deDupStrategy;
+
+ protected AbstractDeDuplicatingMessageListener(DeDuplicationStrategy deDupStrategy) {
+ Assert.notNull(deDupStrategy, "'deDupStrategy' cannot be null");
+ this.deDupStrategy = deDupStrategy;
+ }
+
+ protected boolean isDuplicate(ConsumerRecord consumerRecord) {
+ return this.deDupStrategy.isDuplicate(consumerRecord);
+ }
+
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DeDuplicatingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DeDuplicatingMessageListenerAdapter.java
new file mode 100644
index 00000000..73050e3d
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DeDuplicatingMessageListenerAdapter.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.listener.adapter;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import org.springframework.kafka.listener.MessageListener;
+
+/**
+ * A {@link MessageListener} adapter that implements de-duplication logic
+ * via a DeDuplicationStrategy.
+ *
+ * @param the key type.
+ * @param the value type.
+ *
+ * @author Gary Russell
+ *
+ */
+public class DeDuplicatingMessageListenerAdapter extends AbstractDeDuplicatingMessageListener
+ implements MessageListener {
+
+ private final MessageListener delegate;
+
+ public DeDuplicatingMessageListenerAdapter(DeDuplicationStrategy deDupStrategy,
+ MessageListener delegate) {
+ super(deDupStrategy);
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void onMessage(ConsumerRecord consumerRecord) {
+ if (!isDuplicate(consumerRecord)) {
+ this.delegate.onMessage(consumerRecord);
+ }
+ }
+
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DeDuplicationStrategy.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DeDuplicationStrategy.java
new file mode 100644
index 00000000..dfaea7b8
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DeDuplicationStrategy.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.listener.adapter;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * Implementations of this interface can signal that a message about
+ * to be delivered to a message listener is a duplicate.
+ *
+ * @param the key type.
+ * @param the value type.
+ *
+ * @author Gary Russell
+ *
+ */
+public interface DeDuplicationStrategy {
+
+ /**
+ * Return true if the record is a duplicate and should be discarded.
+ * @param consumerRecord the record.
+ * @return true to discard.
+ */
+ boolean isDuplicate(ConsumerRecord consumerRecord);
+
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java
index 1c537436..b3b507d4 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java
@@ -59,6 +59,8 @@ public class MessagingMessageListenerAdapter extends AbstractAdaptableMess
private MessageConverter messageConverter = new MessagingMessageConverter();
+ private DeDuplicationStrategy deDuplicationStrategy;
+
public MessagingMessageListenerAdapter(Method method) {
this.inferredType = determineInferredType(method);
@@ -101,6 +103,18 @@ public class MessagingMessageListenerAdapter extends AbstractAdaptableMess
invokeHandler(record, acknowledgment, message);
}
+ protected DeDuplicationStrategy getDeDuplicationStrategy() {
+ return this.deDuplicationStrategy;
+ }
+
+ /**
+ * Set a {@link DeDuplicationStrategy} implementation.
+ * @param deDuplicationStrategy the strategy implementation.
+ */
+ public void setDeDuplicationStrategy(DeDuplicationStrategy deDuplicationStrategy) {
+ this.deDuplicationStrategy = deDuplicationStrategy;
+ }
+
protected Message> toMessagingMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
return getMessageConverter().toMessage(record, acknowledgment, this.inferredType);
}
@@ -114,6 +128,9 @@ public class MessagingMessageListenerAdapter extends AbstractAdaptableMess
* @return the result of invocation.
*/
private Object invokeHandler(ConsumerRecord record, Acknowledgment acknowledgment, Message> message) {
+ if (this.deDuplicationStrategy != null && this.deDuplicationStrategy.isDuplicate(record)) {
+ return null;
+ }
try {
return this.handlerMethod.invoke(message, record, acknowledgment);
}
diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java
index 16eb3cdd..84d2cb03 100644
--- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java
+++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java
@@ -47,6 +47,7 @@ import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
+import org.springframework.kafka.listener.adapter.DeDuplicationStrategy;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
@@ -95,6 +96,9 @@ public class EnableKafkaIntegrationTests {
@Autowired
public KafkaListenerEndpointRegistry registry;
+ @Autowired
+ private DeDupImpl deDup;
+
@Test
public void testSimple() throws Exception {
template.send("annotated1", 0, "foo");
@@ -130,6 +134,7 @@ public class EnableKafkaIntegrationTests {
template.flush();
assertThat(this.listener.latch7.await(10, TimeUnit.SECONDS)).isTrue();
+ assertThat(this.deDup.called).isTrue();
}
@Test
@@ -195,9 +200,15 @@ public class EnableKafkaIntegrationTests {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
+ factory.setDeDuplicationStrategy(deDup());
return factory;
}
+ @Bean
+ public DeDupImpl deDup() {
+ return new DeDupImpl();
+ }
+
@Bean
public KafkaListenerContainerFactory>
kafkaJsonListenerContainerFactory() {
@@ -462,4 +473,16 @@ public class EnableKafkaIntegrationTests {
}
+ public static class DeDupImpl implements DeDuplicationStrategy {
+
+ private boolean called;
+
+ @Override
+ public boolean isDuplicate(ConsumerRecord consumerRecord) {
+ called = true;
+ return false;
+ }
+
+ }
+
}
diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc
index 2c92f2a7..1abc71b0 100644
--- a/src/reference/asciidoc/kafka.adoc
+++ b/src/reference/asciidoc/kafka.adoc
@@ -330,3 +330,23 @@ public void listen(@Payload String foo,
...
}
----
+
+===== Handling Duplicates
+
+In certain scenarios, such as rebalancing, a message may be redelivered that has already been processed.
+The framework cannot know whether such a message has been processed or not, that is an application-level
+function.
+This is known as the http://www.enterpriseintegrationpatterns.com/patterns/messaging/IdempotentReceiver.html[Idempotent
+Receiver] pattern and Spring Integration provides an
+http://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#idempotent-receiver
+[implementation thereof].
+
+The Spring for Apache Kafka project also provides some assistance by means of the `DeDuplicatingMessageListenerAdapter`
+classe, which can wrap your `MessageListener`.
+This class takes an implementation of `DeDuplicationStrategy` where you implement the `isDuplicate` method to signal
+that a message is a duplicate and should be discarded.
+
+Similarly, when using `@KafkaListener`, the `DeDuplicationStrategy` can be injected into the container factory.
+
+A wrapper is not provided for the `AcknowledgingMessageListener` because you might need to acknowledge the duplicated
+message.