From 5fa753cbb99057cd61a07c5ea85e2d32dc5db4ce Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Sat, 14 May 2016 14:29:57 -0400 Subject: [PATCH] GH-80: Add DeDuplication Listener Adapters Resolves #80 CheckStyle Fixes Polishing - PR Comments Upgrade to Gradle 2.13 --- .travis.yml | 9 +++ gradle.properties | 1 + gradle/wrapper/gradle-wrapper.jar | Bin 53639 -> 53556 bytes gradle/wrapper/gradle-wrapper.properties | 4 +- gradlew | 46 +++++++------ gradlew.bat | 6 +- .../kafka/annotation/KafkaListener.java | 64 ++++++++++-------- ...AbstractKafkaListenerContainerFactory.java | 11 +++ .../config/AbstractKafkaListenerEndpoint.java | 15 ++++ .../config/MethodKafkaListenerEndpoint.java | 3 + .../KafkaMessageListenerContainer.java | 4 +- .../AbstractDeDuplicatingMessageListener.java | 46 +++++++++++++ .../DeDuplicatingMessageListenerAdapter.java | 51 ++++++++++++++ .../adapter/DeDuplicationStrategy.java | 40 +++++++++++ .../MessagingMessageListenerAdapter.java | 17 +++++ .../EnableKafkaIntegrationTests.java | 23 +++++++ src/reference/asciidoc/kafka.adoc | 20 ++++++ 17 files changed, 302 insertions(+), 58 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractDeDuplicatingMessageListener.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DeDuplicatingMessageListenerAdapter.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DeDuplicationStrategy.java diff --git a/.travis.yml b/.travis.yml index c0f28cfa..a176eac9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,2 +1,11 @@ language: java jdk: oraclejdk8 +install: true +before_cache: + - rm -f $HOME/.gradle/caches/modules-2/modules-2.lock +cache: + directories: + - $HOME/.gradle/caches/ + - $HOME/.gradle/wrapper/ +script: + - ./gradlew check --no-daemon diff --git a/gradle.properties b/gradle.properties index bebfcbcf..2b2996bc 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1,2 @@ version=1.0.0.BUILD-SNAPSHOT +org.gradle.daemon=true diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 2c6137b87896c8f70315ae454e00a969ef5f6019..ca78035ef0501d802d4fc55381ef2d5c3ce0ec6e 100644 GIT binary patch delta 3268 zcmZWr2|SePAAjc^S7GF)QG>~mbDWWljWQ!gNJ2{F`n#{flx1jXU8N_BVo{_+EaTXA z?dBYeyAnn=i$qE~eNx%~JJ0;L{m<@vKJ)qg<~!f-?|Pp1d7q4{_=YQZac6tHz(xcS z5kcY&it@zOMKD7XerYC~XW=FNCgxQP4*8oBiczks;JKz>6Pi^D*8nZtA6&~Mps^?f z&Imk(K!jig1emfS(n1i?CQP7`7?&>`bp6F7n-)myc4=v$ROUpxNMq z0!mMI(kk`FV<*+Qhiw1)_ngqAQ!g6tHjX*1uYEjcg=;w*nY7SVk-DAqjI!8q!a=+6 zRU0`c@(S~oyBtYwI(7-wl6?tPlG+~mGv7a& zyCxrM`H1z4Yl}wLN2#^lFLRP~FW&3l zqd7mGUL+!F?C2TD8gU5PYaeiev9_VZxHuH8X>H9+ImPCbH6{*H-X#a03_a3Z(D?46 zUYcjLZsV1S^kVs(2#x>v3#32Qv5?$1&avGm5kBd)^Y&!USs7JXXxiUj|1* z7?`~n6q__t_*-Fq<1C$V_J=CBuQUs8{&ZGe!0X&<(Ws%_a;0*OUozhNu}jtlMopEQ zmu<@}RYt`|8&*WEOAKRqwzHuA+VQYOwZR(p0ms0c2k8-_-i1x!eall$&jKUa_e>X- zcGVnyd1OB3V3+!`U7l;5-~o028?;%nU{j`;lAx#TYWb~k7D3;3{%l!U>JeGeth8+D zjFD{VEF)8Yr82eF?R0WZq=3-=-(6AN`~r`!bt`kiBV=l|CqJ1kIVa-$m-=Ei7Mq{U zf1kf6VP?BFIjuL&!HtWCg5T=q|g>(JI^4=-~D^*;k~wh{Qf%MGx@O zk`A(Z70^`DfAf!RAJRuv3mR|jUktQHJ=h{2cY-5BGGp;VswSQ>YIEP|eU|u0Ei%df zRmgiVR5_}W^;%l6^NraM$uOlLi%#kmEgo$fsy#1(f1>7Ptt3btnzrnGnMkt=Ncw$& zn&PU9P*Qg1iYv5Jl3l!YeoMPdn{KvwaQj7v$&SLbkJN1oi}1_AS27ovu)uM{&=a=gJ( z`KpBCJK=TT4}!JRhqCAGy!{$1ESJz1uKTzN2AiTsf%ij z4Y|>8XM()jLUq1XCq0fGqu5hibe!7a^Er*DaXwY=ZI7OMp$%x#fGuEf~jl_RGzvyD>o9`nC?^06V z=aw2|-@D`Wz@r^M&X7v$8eKnFl_gGQ5bIw=#5Cn+FWBy1aelra@%MnSCIQ`72aj3i z4d1gU*8#@sA(h#;c!|EPY>*Fp3XU@e- zLH2t2D^!wO4Rs{&`30<+VnGBk0>0rF7 z!b|v+xtqt){%M$ptlx3taSpZsyc9Rh@RO@Isn|Bzi+xE5a2`=%kN+%LAR z8hGq&4x(C(zr58!b?Z*FOrdpavxGLk%?aB1wn%6_+8v?2-tGY{zT+sg z{v93!{<5!F5fL)RW6 z0MTwSDE;!pVlpzHG!2+`8~)#;8r?06lLZ;w8r&TK>bi+IMKI9475-Z8CZaT7U@)i% zw)Ln($gzir-aifug2^DKM_QS??5@@uwp>RMWEE@YE52NJ{ULF-hZkJDmx#6nVUQJg z*)ImQ3uN)vRIEjNOmD!ay&ZxPm4JDuJn-z}k*(x||LhhZKF9o$( zyG_DQV_H#6yOS?LR3@0t{0dn1h(T>;KM~!N4GU!V8$f6sgI?!ikSf?UzzgRiVR`Q= z*4%ciIU~MsCFM{{xgw?GY>z`_@Vtxmzm5w&QCef1tXI7+>)F)$V{9Q-+)Zqm_}9~@7S3tQkSZLCh5$`W5gSSnw-dhj z>h%~=354|%VUnUDBAU*GjZF+`KrKKlins&)glWzBwCrY}JWPeUkYOTPa)&pNpC!4g k4PiD0zx{;=Ul9ZnBPyhSpL2i5Y+z)9{Ue4#gjO#0UthZVsQ>@~ delta 3282 zcmZWrc|6ox8~+(=7)xU;4bjMw#AKJH7ujVCQErIr$rk0NF$k|joT8AWON2r*DQg%E z$r9bV*}_c{m8GI?uJ<=*ZoRM1`}^m4=6k;1=RD^*&pC5wH`$YaW5?QCvvcqw2oDcJ zvI)t=%JZPc;z5_IVy4~^+*0^bI2-a`iX2KQ-@(o_PZXh9B{2<9Vw+-GssJVe1A#=2 zSqOx1mOy}t=fQD7WVgUej4;ZzaEO%(nBhPW33PfAn35I+1#~I$C71aKoU_Qr$vhu= zRPS&0R#q2yFah6E;_$(|N`qFVf;;V%;~Ng10vCL=gSBXQeBqkedOLqQ{ji~K0$sed zzdL$E@pH6xao@!e-I-zoR@=TXRir zgi5ep{wL=nP?aXu+6oqODWZy}dE3 zNkLUlq4Z-Ai6m?3NLNmDDhM*-4Zz<@?d8Ro+&a-YoR3bx|-ynf?k`#JgpVp7)G!H-deXS*sS9XSxPnsXRBO$E!y-8&3k1s zmq;F}QH+}A)hbIoy4$%%T`lV6L*EV?J+0%u2bvlql^^=bYlXb_7t^?tIL{+nDdSYC z7dXQkVtGLG0_DDHuen09SQc?Mx6agU1z)Qeu~Z^Kx9arwJ+dqHS=E&MaJyU7hw+7! zE3T941r}YNI^^5NF0LW>b9%1#66YKzd|HY;tuG`v9M(w+vL5|hKNET4nU9lSRZ(eG zZFk}c;XvWRaD3d!J3`gSF7+M>l~eb(od|E(nQFaK5wG&5K=gFcw~^wQns>V9bD_Vq z+qfki4Snl<=JWBpFUzuP_-q}oOumaTv;J&-NUG83t@fR<2=Y*O+1Z@b-TTx$hl2wW zbK=weXjz~9=V~&C!<@fOj1`rMMBe2%f6KGp*TsoPFOa&=g``fs*ApeEm;OF+cDS9_ zFW5W#-lElMdPLexiU9XnOLr+wcfi)S`;TiLNmIwkCHqJ295-SR4F|)dq$}1e7>tL- z(>STZc(-#&1Ewj;59`Yg%K;YTmP@>JsUu<1H4k)W{dLk1ZHMpL$hK zYpxds<+ui4w`eA6-oz5iC-V>LeQ7#pbL_j+z?p<|<(7JRurnIMr$*iRCme5 zhuobPG8MeXteydtMTwB6O9bhi;L?R^1!;>S`SZ}s?m*Gvy%r-c?i$1RgWha zz6E)gKlT_TDr6VgsTN-ynIF{>sm)Z;!ase!DwJ19J9#$o3;Ev@#5zN>aSIYfmjV(X6#!(%=($9M@@7RjcE2nqie=`a{{*>;s`=r^6!#U##F3Uh10iVh6X+h z#RT9@^Yd3;yAvzOFXooy_t%Aljg5C1jdfZ-wCm&6S)lCW{_6G2w%2e%)LiWS@v(6; zg*~e)!}Ib6+lx-bwQ%_xv~Dd4<5eBLXcup5R~?F-)}Gkzmub9)H_>UIcAs8-?$BgK zuD;6WIuvuG(fe3tMnrV`D0YK;)75HGdX_|;$EUF_%ouSOZg^o1OS|BpqpZ%)2^~Eh zQ9kH6nXK|c$JTb7b-SuKbF<_+hn;z_%xgBJvMkmfK@eMz>tT#rJgDqQ(r_Tv(A2c* z)HIwzqp)DnLv@Qd%DkGZA~6UQJ>|&FA#@UdVswj@FeOV;SArI~+ukQ&@cy9Sd&Qca zCRvvZ&%c+SijqF7KcwQdoFjKcY2VBN&*)HWW}tf5b4<`srEfDvTb(0510v7?y~C$m)P;F+UHG-d+$L( z-wKXsYVIi|&DSNm^Adj-#ks3CepN4-iN;m8__(X-e|kr9yt!5KiQKCm&2@{o>4L&G z$J^hf2lHPh5t2XGz8-fPiVs`D*fG-o{+~ZEbeTZjOPL+DRA;Xg4ATLV-g6Lm;eX_c%v8|md+;ZQ~K5(_CP znPfSXvN{epw6BVt9XkK2UStia-}{9X2T+YzoEHxUSx7~_6N|&v1lY063~SOOoXya- zH~@+t8E$Hedup|?Emj^Wu%nLwAl0hz^HBiYTK6#<4rI5gLfhS{1ua{fA+*MAw$OsM zQ_ym@AB5Jq-4)uJ_G8cr(_IBNKLS8B-5w4-rV}uGM8V!pJoru*mtnH{!{<}++z1jP zfFSZ{(my^LrBxgf3&e52y@Ma>gE|Np2^k3A=#YR=Jqm@ZL&#oz8<=PCOEbe-Jb$or z7=3-niX({PkC>>XQ7C}asSTxpodis-36$n{%ED}3_6wgWLieABW<%U0yJGv#wA|pY zP7z2BlnJ~{V?z)Zb_CJcM2>WZTOs-9;YNwFo5;^SK%s{m*uvnKVQx1(+bBU2O$A-;&yPm&^8|Z6w*fl_KeL;_ zF$RI{B#2>bhrjX|1WdjcxLhUyV!dGDAp8x56?_0CQWos%#zDxwn}ETcfY9Y`1qd~u z5Z)h!c4hStVHR&?vCHOY2G-~a+WRvHX2IElP@W_>)*~y;On1MS{{{oy@MIi<7;GZu zfTA8782)1q0aG7|%5*{g6MiUref)QYDKvHnD&4OREz%0ZA%PJyh=!J(94D=G%I>3CdC0v_Y@ZmZubZ!px z*fS5-kB>&VmIlib{bVnURQZH}G0Q+9A|PdOfyF1R!f&APQZZ!i?_*WpvyXt8DM2B$ zfLx%ek2P%@iguTwdt$(BWEfJZpMarMp~r&{+=K+8eECVQ_(l?&{CY^LlFHR%RnuSGD3y>uoS \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null APP_NAME="Gradle" APP_BASE_NAME=`basename "$0"` +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS="" + # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD="maximum" @@ -30,6 +48,7 @@ die ( ) { cygwin=false msys=false darwin=false +nonstop=false case "`uname`" in CYGWIN* ) cygwin=true @@ -40,26 +59,11 @@ case "`uname`" in MINGW* ) msys=true ;; + NONSTOP* ) + nonstop=true + ;; esac -# Attempt to set APP_HOME -# Resolve links: $0 may be a link -PRG="$0" -# Need this for relative symlinks. -while [ -h "$PRG" ] ; do - ls=`ls -ld "$PRG"` - link=`expr "$ls" : '.*-> \(.*\)$'` - if expr "$link" : '/.*' > /dev/null; then - PRG="$link" - else - PRG=`dirname "$PRG"`"/$link" - fi -done -SAVED="`pwd`" -cd "`dirname \"$PRG\"`/" >/dev/null -APP_HOME="`pwd -P`" -cd "$SAVED" >/dev/null - CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar # Determine the Java command to use to start the JVM. @@ -85,7 +89,7 @@ location of your Java installation." fi # Increase the maximum file descriptors if we can. -if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then MAX_FD_LIMIT=`ulimit -H -n` if [ $? -eq 0 ] ; then if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then diff --git a/gradlew.bat b/gradlew.bat index 72d362da..f6d5974e 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -8,14 +8,14 @@ @rem Set local scope for the variables with windows NT shell if "%OS%"=="Windows_NT" setlocal -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS= - set DIRNAME=%~dp0 if "%DIRNAME%" == "" set DIRNAME=. set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + @rem Find java.exe if defined JAVA_HOME goto findJavaFromJavaHome diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java index 7643d4ea..9a2403d6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java @@ -24,53 +24,57 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import org.springframework.kafka.listener.MessageListener; -import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter; import org.springframework.messaging.handler.annotation.MessageMapping; /** - * Annotation that marks a method to be the target of a Kafka message - * listener on the specified topics. + * Annotation that marks a method to be the target of a Kafka message listener on the + * specified topics. * - * The {@link #containerFactory()} - * identifies the {@link org.springframework.kafka.config.KafkaListenerContainerFactory + * The {@link #containerFactory()} identifies the + * {@link org.springframework.kafka.config.KafkaListenerContainerFactory * KafkaListenerContainerFactory} to use to build the Kafka listener container. If not - * set, a default container factory is assumed to be available with a bean - * name of {@code kafkaListenerContainerFactory} unless an explicit default has been - * provided through configuration. + * set, a default container factory is assumed to be available with a bean name + * of {@code kafkaListenerContainerFactory} unless an explicit default has been provided + * through configuration. * - *

Processing of {@code @KafkaListener} annotations is performed by - * registering a {@link KafkaListenerAnnotationBeanPostProcessor}. This can be - * done manually or, more conveniently, through {@link EnableKafka} annotation. + *

+ * Processing of {@code @KafkaListener} annotations is performed by registering a + * {@link KafkaListenerAnnotationBeanPostProcessor}. This can be done manually or, more + * conveniently, through {@link EnableKafka} annotation. * - *

Annotated methods are allowed to have flexible signatures similar to what + *

+ * Annotated methods are allowed to have flexible signatures similar to what * {@link MessageMapping} provides, that is *

    - *
  • {@link org.apache.kafka.clients.consumer.ConsumerRecord} to - * access to the raw Kafka message
  • + *
  • {@link org.apache.kafka.clients.consumer.ConsumerRecord} to access to the raw Kafka + * message
  • *
  • {@link org.springframework.kafka.support.Acknowledgment} to manually ack
  • - *
  • {@link org.springframework.messaging.handler.annotation.Payload @Payload}-annotated method - * arguments including the support of validation
  • - *
  • {@link org.springframework.messaging.handler.annotation.Header @Header}-annotated method - * arguments to extract a specific header value, defined by + *
  • {@link org.springframework.messaging.handler.annotation.Payload @Payload}-annotated + * method arguments including the support of validation
  • + *
  • {@link org.springframework.messaging.handler.annotation.Header @Header}-annotated + * method arguments to extract a specific header value, defined by * {@link org.springframework.kafka.support.KafkaHeaders KafkaHeaders}
  • *
  • {@link org.springframework.messaging.handler.annotation.Headers @Headers}-annotated - * argument that must also be assignable to {@link java.util.Map} for getting access to all - * headers.
  • + * argument that must also be assignable to {@link java.util.Map} for getting access to + * all headers. *
  • {@link org.springframework.messaging.MessageHeaders MessageHeaders} arguments for * getting access to all headers.
  • - *
  • {@link org.springframework.messaging.support.MessageHeaderAccessor MessageHeaderAccessor} - * for convenient access to all method arguments.
  • + *
  • {@link org.springframework.messaging.support.MessageHeaderAccessor + * MessageHeaderAccessor} for convenient access to all method arguments.
  • *
* - *

When defined at the method level, a listener container is created for each method. The - * {@link MessageListener} is a {@link MessagingMessageListenerAdapter}, configured with a - * {@link org.springframework.kafka.config.MethodKafkaListenerEndpoint}. + *

When defined at the method level, a listener container is created for each method. + * The {@link MessageListener} is a + * {@link org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter}, + * configured with a {@link org.springframework.kafka.config.MethodKafkaListenerEndpoint}. * - *

When defined at the class level, a single message listener container is used to service - * all methods annotated with {@code @KafkaHandler}. Method signatures of such annotated - * methods must not cause any ambiguity such that a single method can be resolved for a - * particular inbound message. The {@link MessagingMessageListenerAdapter} is configured with - * a {@link org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint}. + *

When defined at the class level, a single message listener container is used to + * service all methods annotated with {@code @KafkaHandler}. Method signatures of such + * annotated methods must not cause any ambiguity such that a single method can be + * resolved for a particular inbound message. The + * {@link org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter} is + * configured with a + * {@link org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint}. * * @author Gary Russell * diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java index fda96fe2..a1ba9427 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java @@ -23,6 +23,7 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; import org.springframework.kafka.listener.ErrorHandler; +import org.springframework.kafka.listener.adapter.DeDuplicationStrategy; import org.springframework.kafka.support.converter.MessageConverter; /** @@ -57,6 +58,8 @@ public abstract class AbstractKafkaListenerContainerFactory deDuplicationStrategy; + /** * Specify a {@link ConsumerFactory} to use. * @param consumerFactory The consumer factory. @@ -140,6 +143,11 @@ public abstract class AbstractKafkaListenerContainerFactory deDuplicationStrategy) { + this.deDuplicationStrategy = deDuplicationStrategy; + } + + @SuppressWarnings("unchecked") @Override public C createListenerContainer(KafkaListenerEndpoint endpoint) { C instance = createContainerInstance(endpoint); @@ -169,6 +177,9 @@ public abstract class AbstractKafkaListenerContainerFactory) endpoint).setDeDuplicationStrategy(this.deDuplicationStrategy); + } endpoint.setupListenerContainer(instance, this.messageConverter); initializeContainer(instance); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index 160c6085..c2fdadaf 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -33,6 +33,7 @@ import org.springframework.beans.factory.config.BeanExpressionResolver; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.listener.adapter.DeDuplicationStrategy; import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.util.Assert; @@ -67,6 +68,8 @@ public abstract class AbstractKafkaListenerEndpoint private String group; + private DeDuplicationStrategy deDuplicationStrategy; + @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { @@ -193,6 +196,18 @@ public abstract class AbstractKafkaListenerEndpoint } } + protected DeDuplicationStrategy getDeDuplicationStrategy() { + return this.deDuplicationStrategy; + } + + /** + * Set a {@link DeDuplicationStrategy} implementation. + * @param deDuplicationStrategy the strategy implementation. + */ + public void setDeDuplicationStrategy(DeDuplicationStrategy deDuplicationStrategy) { + this.deDuplicationStrategy = deDuplicationStrategy; + } + @Override public void setupListenerContainer(MessageListenerContainer listenerContainer, MessageConverter messageConverter) { setupMessageListener(listenerContainer, messageConverter); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java index 1decc744..6afa71e0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java @@ -94,6 +94,9 @@ public class MethodKafkaListenerEndpoint extends AbstractKafkaListenerEndp Assert.state(this.messageHandlerMethodFactory != null, "Could not create message listener - MessageHandlerMethodFactory not set"); MessagingMessageListenerAdapter messageListener = createMessageListenerInstance(); + if (getDeDuplicationStrategy() != null) { + messageListener.setDeDuplicationStrategy(getDeDuplicationStrategy()); + } messageListener.setHandlerMethod(configureListenerAdapter(messageListener)); if (messageConverter != null) { messageListener.setMessageConverter(messageConverter); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index be98ba15..d10afd2c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -458,8 +458,8 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener ackImmediate(record); } else { - throw new IllegalStateException("AckMode must be MANUAL or MANUAL_IMMEDIATE " - + "for manual acks"); + throw new IllegalStateException("AckMode must be MANUAL, " + + "MANUAL_IMMEDIATE, or MANUAL_IMMEDIATE_SYNC for manual acks"); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractDeDuplicatingMessageListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractDeDuplicatingMessageListener.java new file mode 100644 index 00000000..79fb0f3c --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractDeDuplicatingMessageListener.java @@ -0,0 +1,46 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener.adapter; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import org.springframework.util.Assert; + +/** + * An abstract message listener adapter that implements de-duplication logic + * via a {@link DeDuplicationStrategy}. + * + * @param the key type. + * @param the value type. + * + * @author Gary Russell + * + */ +public abstract class AbstractDeDuplicatingMessageListener { + + private final DeDuplicationStrategy deDupStrategy; + + protected AbstractDeDuplicatingMessageListener(DeDuplicationStrategy deDupStrategy) { + Assert.notNull(deDupStrategy, "'deDupStrategy' cannot be null"); + this.deDupStrategy = deDupStrategy; + } + + protected boolean isDuplicate(ConsumerRecord consumerRecord) { + return this.deDupStrategy.isDuplicate(consumerRecord); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DeDuplicatingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DeDuplicatingMessageListenerAdapter.java new file mode 100644 index 00000000..73050e3d --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DeDuplicatingMessageListenerAdapter.java @@ -0,0 +1,51 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener.adapter; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import org.springframework.kafka.listener.MessageListener; + +/** + * A {@link MessageListener} adapter that implements de-duplication logic + * via a DeDuplicationStrategy. + * + * @param the key type. + * @param the value type. + * + * @author Gary Russell + * + */ +public class DeDuplicatingMessageListenerAdapter extends AbstractDeDuplicatingMessageListener + implements MessageListener { + + private final MessageListener delegate; + + public DeDuplicatingMessageListenerAdapter(DeDuplicationStrategy deDupStrategy, + MessageListener delegate) { + super(deDupStrategy); + this.delegate = delegate; + } + + @Override + public void onMessage(ConsumerRecord consumerRecord) { + if (!isDuplicate(consumerRecord)) { + this.delegate.onMessage(consumerRecord); + } + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DeDuplicationStrategy.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DeDuplicationStrategy.java new file mode 100644 index 00000000..dfaea7b8 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DeDuplicationStrategy.java @@ -0,0 +1,40 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener.adapter; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +/** + * Implementations of this interface can signal that a message about + * to be delivered to a message listener is a duplicate. + * + * @param the key type. + * @param the value type. + * + * @author Gary Russell + * + */ +public interface DeDuplicationStrategy { + + /** + * Return true if the record is a duplicate and should be discarded. + * @param consumerRecord the record. + * @return true to discard. + */ + boolean isDuplicate(ConsumerRecord consumerRecord); + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 1c537436..b3b507d4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -59,6 +59,8 @@ public class MessagingMessageListenerAdapter extends AbstractAdaptableMess private MessageConverter messageConverter = new MessagingMessageConverter(); + private DeDuplicationStrategy deDuplicationStrategy; + public MessagingMessageListenerAdapter(Method method) { this.inferredType = determineInferredType(method); @@ -101,6 +103,18 @@ public class MessagingMessageListenerAdapter extends AbstractAdaptableMess invokeHandler(record, acknowledgment, message); } + protected DeDuplicationStrategy getDeDuplicationStrategy() { + return this.deDuplicationStrategy; + } + + /** + * Set a {@link DeDuplicationStrategy} implementation. + * @param deDuplicationStrategy the strategy implementation. + */ + public void setDeDuplicationStrategy(DeDuplicationStrategy deDuplicationStrategy) { + this.deDuplicationStrategy = deDuplicationStrategy; + } + protected Message toMessagingMessage(ConsumerRecord record, Acknowledgment acknowledgment) { return getMessageConverter().toMessage(record, acknowledgment, this.inferredType); } @@ -114,6 +128,9 @@ public class MessagingMessageListenerAdapter extends AbstractAdaptableMess * @return the result of invocation. */ private Object invokeHandler(ConsumerRecord record, Acknowledgment acknowledgment, Message message) { + if (this.deDuplicationStrategy != null && this.deDuplicationStrategy.isDuplicate(record)) { + return null; + } try { return this.handlerMethod.invoke(message, record, acknowledgment); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index 16eb3cdd..84d2cb03 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -47,6 +47,7 @@ import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.listener.adapter.DeDuplicationStrategy; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.converter.StringJsonMessageConverter; @@ -95,6 +96,9 @@ public class EnableKafkaIntegrationTests { @Autowired public KafkaListenerEndpointRegistry registry; + @Autowired + private DeDupImpl deDup; + @Test public void testSimple() throws Exception { template.send("annotated1", 0, "foo"); @@ -130,6 +134,7 @@ public class EnableKafkaIntegrationTests { template.flush(); assertThat(this.listener.latch7.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.deDup.called).isTrue(); } @Test @@ -195,9 +200,15 @@ public class EnableKafkaIntegrationTests { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); + factory.setDeDuplicationStrategy(deDup()); return factory; } + @Bean + public DeDupImpl deDup() { + return new DeDupImpl(); + } + @Bean public KafkaListenerContainerFactory> kafkaJsonListenerContainerFactory() { @@ -462,4 +473,16 @@ public class EnableKafkaIntegrationTests { } + public static class DeDupImpl implements DeDuplicationStrategy { + + private boolean called; + + @Override + public boolean isDuplicate(ConsumerRecord consumerRecord) { + called = true; + return false; + } + + } + } diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index 2c92f2a7..1abc71b0 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -330,3 +330,23 @@ public void listen(@Payload String foo, ... } ---- + +===== Handling Duplicates + +In certain scenarios, such as rebalancing, a message may be redelivered that has already been processed. +The framework cannot know whether such a message has been processed or not, that is an application-level +function. +This is known as the http://www.enterpriseintegrationpatterns.com/patterns/messaging/IdempotentReceiver.html[Idempotent +Receiver] pattern and Spring Integration provides an +http://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#idempotent-receiver +[implementation thereof]. + +The Spring for Apache Kafka project also provides some assistance by means of the `DeDuplicatingMessageListenerAdapter` +classe, which can wrap your `MessageListener`. +This class takes an implementation of `DeDuplicationStrategy` where you implement the `isDuplicate` method to signal +that a message is a duplicate and should be discarded. + +Similarly, when using `@KafkaListener`, the `DeDuplicationStrategy` can be injected into the container factory. + +A wrapper is not provided for the `AcknowledgingMessageListener` because you might need to acknowledge the duplicated +message.