GH-80: Add DeDuplication Listener Adapters

Resolves #80

CheckStyle Fixes

Polishing - PR Comments

Upgrade to Gradle 2.13
This commit is contained in:
Gary Russell
2016-05-14 14:29:57 -04:00
committed by Artem Bilan
parent da88336b83
commit 5fa753cbb9
17 changed files with 302 additions and 58 deletions

View File

@@ -1,2 +1,11 @@
language: java language: java
jdk: oraclejdk8 jdk: oraclejdk8
install: true
before_cache:
- rm -f $HOME/.gradle/caches/modules-2/modules-2.lock
cache:
directories:
- $HOME/.gradle/caches/
- $HOME/.gradle/wrapper/
script:
- ./gradlew check --no-daemon

View File

@@ -1 +1,2 @@
version=1.0.0.BUILD-SNAPSHOT version=1.0.0.BUILD-SNAPSHOT
org.gradle.daemon=true

Binary file not shown.

View File

@@ -1,6 +1,6 @@
#Wed Mar 30 18:31:11 EDT 2016 #Mon May 16 18:24:00 EDT 2016
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.12-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-2.13-bin.zip

46
gradlew vendored
View File

@@ -6,12 +6,30 @@
## ##
############################################################################## ##############################################################################
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. # Attempt to set APP_HOME
DEFAULT_JVM_OPTS="" # Resolve links: $0 may be a link
PRG="$0"
# Need this for relative symlinks.
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "$PRG"`"/$link"
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >/dev/null
APP_NAME="Gradle" APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"` APP_BASE_NAME=`basename "$0"`
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS=""
# Use the maximum available, or set MAX_FD != -1 to use that value. # Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum" MAX_FD="maximum"
@@ -30,6 +48,7 @@ die ( ) {
cygwin=false cygwin=false
msys=false msys=false
darwin=false darwin=false
nonstop=false
case "`uname`" in case "`uname`" in
CYGWIN* ) CYGWIN* )
cygwin=true cygwin=true
@@ -40,26 +59,11 @@ case "`uname`" in
MINGW* ) MINGW* )
msys=true msys=true
;; ;;
NONSTOP* )
nonstop=true
;;
esac esac
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
# Need this for relative symlinks.
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "$PRG"`"/$link"
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >/dev/null
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM. # Determine the Java command to use to start the JVM.
@@ -85,7 +89,7 @@ location of your Java installation."
fi fi
# Increase the maximum file descriptors if we can. # Increase the maximum file descriptors if we can.
if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
MAX_FD_LIMIT=`ulimit -H -n` MAX_FD_LIMIT=`ulimit -H -n`
if [ $? -eq 0 ] ; then if [ $? -eq 0 ] ; then
if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then

6
gradlew.bat vendored
View File

@@ -8,14 +8,14 @@
@rem Set local scope for the variables with windows NT shell @rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal if "%OS%"=="Windows_NT" setlocal
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS=
set DIRNAME=%~dp0 set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=. if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0 set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME% set APP_HOME=%DIRNAME%
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS=
@rem Find java.exe @rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome if defined JAVA_HOME goto findJavaFromJavaHome

View File

@@ -24,53 +24,57 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target; import java.lang.annotation.Target;
import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.MessageMapping;
/** /**
* Annotation that marks a method to be the target of a Kafka message * Annotation that marks a method to be the target of a Kafka message listener on the
* listener on the specified topics. * specified topics.
* *
* The {@link #containerFactory()} * The {@link #containerFactory()} identifies the
* identifies the {@link org.springframework.kafka.config.KafkaListenerContainerFactory * {@link org.springframework.kafka.config.KafkaListenerContainerFactory
* KafkaListenerContainerFactory} to use to build the Kafka listener container. If not * KafkaListenerContainerFactory} to use to build the Kafka listener container. If not
* set, a <em>default</em> container factory is assumed to be available with a bean * set, a <em>default</em> container factory is assumed to be available with a bean name
* name of {@code kafkaListenerContainerFactory} unless an explicit default has been * of {@code kafkaListenerContainerFactory} unless an explicit default has been provided
* provided through configuration. * through configuration.
* *
* <p>Processing of {@code @KafkaListener} annotations is performed by * <p>
* registering a {@link KafkaListenerAnnotationBeanPostProcessor}. This can be * Processing of {@code @KafkaListener} annotations is performed by registering a
* done manually or, more conveniently, through {@link EnableKafka} annotation. * {@link KafkaListenerAnnotationBeanPostProcessor}. This can be done manually or, more
* conveniently, through {@link EnableKafka} annotation.
* *
* <p>Annotated methods are allowed to have flexible signatures similar to what * <p>
* Annotated methods are allowed to have flexible signatures similar to what
* {@link MessageMapping} provides, that is * {@link MessageMapping} provides, that is
* <ul> * <ul>
* <li>{@link org.apache.kafka.clients.consumer.ConsumerRecord} to * <li>{@link org.apache.kafka.clients.consumer.ConsumerRecord} to access to the raw Kafka
* access to the raw Kafka message</li> * message</li>
* <li>{@link org.springframework.kafka.support.Acknowledgment} to manually ack</li> * <li>{@link org.springframework.kafka.support.Acknowledgment} to manually ack</li>
* <li>{@link org.springframework.messaging.handler.annotation.Payload @Payload}-annotated method * <li>{@link org.springframework.messaging.handler.annotation.Payload @Payload}-annotated
* arguments including the support of validation</li> * method arguments including the support of validation</li>
* <li>{@link org.springframework.messaging.handler.annotation.Header @Header}-annotated method * <li>{@link org.springframework.messaging.handler.annotation.Header @Header}-annotated
* arguments to extract a specific header value, defined by * method arguments to extract a specific header value, defined by
* {@link org.springframework.kafka.support.KafkaHeaders KafkaHeaders}</li> * {@link org.springframework.kafka.support.KafkaHeaders KafkaHeaders}</li>
* <li>{@link org.springframework.messaging.handler.annotation.Headers @Headers}-annotated * <li>{@link org.springframework.messaging.handler.annotation.Headers @Headers}-annotated
* argument that must also be assignable to {@link java.util.Map} for getting access to all * argument that must also be assignable to {@link java.util.Map} for getting access to
* headers.</li> * all headers.</li>
* <li>{@link org.springframework.messaging.MessageHeaders MessageHeaders} arguments for * <li>{@link org.springframework.messaging.MessageHeaders MessageHeaders} arguments for
* getting access to all headers.</li> * getting access to all headers.</li>
* <li>{@link org.springframework.messaging.support.MessageHeaderAccessor MessageHeaderAccessor} * <li>{@link org.springframework.messaging.support.MessageHeaderAccessor
* for convenient access to all method arguments.</li> * MessageHeaderAccessor} for convenient access to all method arguments.</li>
* </ul> * </ul>
* *
* <p>When defined at the method level, a listener container is created for each method. The * <p>When defined at the method level, a listener container is created for each method.
* {@link MessageListener} is a {@link MessagingMessageListenerAdapter}, configured with a * The {@link MessageListener} is a
* {@link org.springframework.kafka.config.MethodKafkaListenerEndpoint}. * {@link org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter},
* configured with a {@link org.springframework.kafka.config.MethodKafkaListenerEndpoint}.
* *
* <p>When defined at the class level, a single message listener container is used to service * <p>When defined at the class level, a single message listener container is used to
* all methods annotated with {@code @KafkaHandler}. Method signatures of such annotated * service all methods annotated with {@code @KafkaHandler}. Method signatures of such
* methods must not cause any ambiguity such that a single method can be resolved for a * annotated methods must not cause any ambiguity such that a single method can be
* particular inbound message. The {@link MessagingMessageListenerAdapter} is configured with * resolved for a particular inbound message. The
* a {@link org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint}. * {@link org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter} is
* configured with a
* {@link org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint}.
* *
* @author Gary Russell * @author Gary Russell
* *

View File

@@ -23,6 +23,7 @@ import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.adapter.DeDuplicationStrategy;
import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.kafka.support.converter.MessageConverter;
/** /**
@@ -57,6 +58,8 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
private MessageConverter messageConverter; private MessageConverter messageConverter;
private DeDuplicationStrategy<K, V> deDuplicationStrategy;
/** /**
* Specify a {@link ConsumerFactory} to use. * Specify a {@link ConsumerFactory} to use.
* @param consumerFactory The consumer factory. * @param consumerFactory The consumer factory.
@@ -140,6 +143,11 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
this.messageConverter = messageConverter; this.messageConverter = messageConverter;
} }
public void setDeDuplicationStrategy(DeDuplicationStrategy<K, V> deDuplicationStrategy) {
this.deDuplicationStrategy = deDuplicationStrategy;
}
@SuppressWarnings("unchecked")
@Override @Override
public C createListenerContainer(KafkaListenerEndpoint endpoint) { public C createListenerContainer(KafkaListenerEndpoint endpoint) {
C instance = createContainerInstance(endpoint); C instance = createContainerInstance(endpoint);
@@ -169,6 +177,9 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
instance.setPollTimeout(this.pollTimeout); instance.setPollTimeout(this.pollTimeout);
} }
if (this.deDuplicationStrategy != null && endpoint instanceof AbstractKafkaListenerEndpoint) {
((AbstractKafkaListenerEndpoint<K, V>) endpoint).setDeDuplicationStrategy(this.deDuplicationStrategy);
}
endpoint.setupListenerContainer(instance, this.messageConverter); endpoint.setupListenerContainer(instance, this.messageConverter);
initializeContainer(instance); initializeContainer(instance);

View File

@@ -33,6 +33,7 @@ import org.springframework.beans.factory.config.BeanExpressionResolver;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.adapter.DeDuplicationStrategy;
import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@@ -67,6 +68,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
private String group; private String group;
private DeDuplicationStrategy<K, V> deDuplicationStrategy;
@Override @Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException { public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
@@ -193,6 +196,18 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
} }
} }
protected DeDuplicationStrategy<K, V> getDeDuplicationStrategy() {
return this.deDuplicationStrategy;
}
/**
* Set a {@link DeDuplicationStrategy} implementation.
* @param deDuplicationStrategy the strategy implementation.
*/
public void setDeDuplicationStrategy(DeDuplicationStrategy<K, V> deDuplicationStrategy) {
this.deDuplicationStrategy = deDuplicationStrategy;
}
@Override @Override
public void setupListenerContainer(MessageListenerContainer listenerContainer, MessageConverter messageConverter) { public void setupListenerContainer(MessageListenerContainer listenerContainer, MessageConverter messageConverter) {
setupMessageListener(listenerContainer, messageConverter); setupMessageListener(listenerContainer, messageConverter);

View File

@@ -94,6 +94,9 @@ public class MethodKafkaListenerEndpoint<K, V> extends AbstractKafkaListenerEndp
Assert.state(this.messageHandlerMethodFactory != null, Assert.state(this.messageHandlerMethodFactory != null,
"Could not create message listener - MessageHandlerMethodFactory not set"); "Could not create message listener - MessageHandlerMethodFactory not set");
MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance(); MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance();
if (getDeDuplicationStrategy() != null) {
messageListener.setDeDuplicationStrategy(getDeDuplicationStrategy());
}
messageListener.setHandlerMethod(configureListenerAdapter(messageListener)); messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
if (messageConverter != null) { if (messageConverter != null) {
messageListener.setMessageConverter(messageConverter); messageListener.setMessageConverter(messageConverter);

View File

@@ -458,8 +458,8 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
ackImmediate(record); ackImmediate(record);
} }
else { else {
throw new IllegalStateException("AckMode must be MANUAL or MANUAL_IMMEDIATE " throw new IllegalStateException("AckMode must be MANUAL, "
+ "for manual acks"); + "MANUAL_IMMEDIATE, or MANUAL_IMMEDIATE_SYNC for manual acks");
} }
} }

View File

@@ -0,0 +1,46 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.listener.adapter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.util.Assert;
/**
* An abstract message listener adapter that implements de-duplication logic
* via a {@link DeDuplicationStrategy}.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
*
*/
public abstract class AbstractDeDuplicatingMessageListener<K, V> {
private final DeDuplicationStrategy<K, V> deDupStrategy;
protected AbstractDeDuplicatingMessageListener(DeDuplicationStrategy<K, V> deDupStrategy) {
Assert.notNull(deDupStrategy, "'deDupStrategy' cannot be null");
this.deDupStrategy = deDupStrategy;
}
protected boolean isDuplicate(ConsumerRecord<K, V> consumerRecord) {
return this.deDupStrategy.isDuplicate(consumerRecord);
}
}

View File

@@ -0,0 +1,51 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.listener.adapter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;
/**
* A {@link MessageListener} adapter that implements de-duplication logic
* via a DeDuplicationStrategy.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
*
*/
public class DeDuplicatingMessageListenerAdapter<K, V> extends AbstractDeDuplicatingMessageListener<K, V>
implements MessageListener<K, V> {
private final MessageListener<K, V> delegate;
public DeDuplicatingMessageListenerAdapter(DeDuplicationStrategy<K, V> deDupStrategy,
MessageListener<K, V> delegate) {
super(deDupStrategy);
this.delegate = delegate;
}
@Override
public void onMessage(ConsumerRecord<K, V> consumerRecord) {
if (!isDuplicate(consumerRecord)) {
this.delegate.onMessage(consumerRecord);
}
}
}

View File

@@ -0,0 +1,40 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.listener.adapter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* Implementations of this interface can signal that a message about
* to be delivered to a message listener is a duplicate.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
*
*/
public interface DeDuplicationStrategy<K, V> {
/**
* Return true if the record is a duplicate and should be discarded.
* @param consumerRecord the record.
* @return true to discard.
*/
boolean isDuplicate(ConsumerRecord<K, V> consumerRecord);
}

View File

@@ -59,6 +59,8 @@ public class MessagingMessageListenerAdapter<K, V> extends AbstractAdaptableMess
private MessageConverter messageConverter = new MessagingMessageConverter(); private MessageConverter messageConverter = new MessagingMessageConverter();
private DeDuplicationStrategy<K, V> deDuplicationStrategy;
public MessagingMessageListenerAdapter(Method method) { public MessagingMessageListenerAdapter(Method method) {
this.inferredType = determineInferredType(method); this.inferredType = determineInferredType(method);
@@ -101,6 +103,18 @@ public class MessagingMessageListenerAdapter<K, V> extends AbstractAdaptableMess
invokeHandler(record, acknowledgment, message); invokeHandler(record, acknowledgment, message);
} }
protected DeDuplicationStrategy<K, V> getDeDuplicationStrategy() {
return this.deDuplicationStrategy;
}
/**
* Set a {@link DeDuplicationStrategy} implementation.
* @param deDuplicationStrategy the strategy implementation.
*/
public void setDeDuplicationStrategy(DeDuplicationStrategy<K, V> deDuplicationStrategy) {
this.deDuplicationStrategy = deDuplicationStrategy;
}
protected Message<?> toMessagingMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment) { protected Message<?> toMessagingMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment) {
return getMessageConverter().toMessage(record, acknowledgment, this.inferredType); return getMessageConverter().toMessage(record, acknowledgment, this.inferredType);
} }
@@ -114,6 +128,9 @@ public class MessagingMessageListenerAdapter<K, V> extends AbstractAdaptableMess
* @return the result of invocation. * @return the result of invocation.
*/ */
private Object invokeHandler(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Message<?> message) { private Object invokeHandler(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Message<?> message) {
if (this.deDuplicationStrategy != null && this.deDuplicationStrategy.isDuplicate(record)) {
return null;
}
try { try {
return this.handlerMethod.invoke(message, record, acknowledgment); return this.handlerMethod.invoke(message, record, acknowledgment);
} }

View File

@@ -47,6 +47,7 @@ import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.adapter.DeDuplicationStrategy;
import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.converter.StringJsonMessageConverter; import org.springframework.kafka.support.converter.StringJsonMessageConverter;
@@ -95,6 +96,9 @@ public class EnableKafkaIntegrationTests {
@Autowired @Autowired
public KafkaListenerEndpointRegistry registry; public KafkaListenerEndpointRegistry registry;
@Autowired
private DeDupImpl deDup;
@Test @Test
public void testSimple() throws Exception { public void testSimple() throws Exception {
template.send("annotated1", 0, "foo"); template.send("annotated1", 0, "foo");
@@ -130,6 +134,7 @@ public class EnableKafkaIntegrationTests {
template.flush(); template.flush();
assertThat(this.listener.latch7.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.listener.latch7.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.deDup.called).isTrue();
} }
@Test @Test
@@ -195,9 +200,15 @@ public class EnableKafkaIntegrationTests {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>(); new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory()); factory.setConsumerFactory(consumerFactory());
factory.setDeDuplicationStrategy(deDup());
return factory; return factory;
} }
@Bean
public DeDupImpl deDup() {
return new DeDupImpl();
}
@Bean @Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaJsonListenerContainerFactory() { kafkaJsonListenerContainerFactory() {
@@ -462,4 +473,16 @@ public class EnableKafkaIntegrationTests {
} }
public static class DeDupImpl implements DeDuplicationStrategy<Integer, String> {
private boolean called;
@Override
public boolean isDuplicate(ConsumerRecord<Integer, String> consumerRecord) {
called = true;
return false;
}
}
} }

View File

@@ -330,3 +330,23 @@ public void listen(@Payload String foo,
... ...
} }
---- ----
===== Handling Duplicates
In certain scenarios, such as rebalancing, a message may be redelivered that has already been processed.
The framework cannot know whether such a message has been processed or not, that is an application-level
function.
This is known as the http://www.enterpriseintegrationpatterns.com/patterns/messaging/IdempotentReceiver.html[Idempotent
Receiver] pattern and Spring Integration provides an
http://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#idempotent-receiver
[implementation thereof].
The Spring for Apache Kafka project also provides some assistance by means of the `DeDuplicatingMessageListenerAdapter`
classe, which can wrap your `MessageListener`.
This class takes an implementation of `DeDuplicationStrategy` where you implement the `isDuplicate` method to signal
that a message is a duplicate and should be discarded.
Similarly, when using `@KafkaListener`, the `DeDuplicationStrategy` can be injected into the container factory.
A wrapper is not provided for the `AcknowledgingMessageListener` because you might need to acknowledge the duplicated
message.