From be2e6ce984cf9173261b542fbb76e1b113498759 Mon Sep 17 00:00:00 2001 From: Marius Bogoevici Date: Fri, 17 Jun 2016 18:12:53 -0400 Subject: [PATCH] GH-117: Move the placement of manual ack handling Fixes GH-117 (https://github.com/spring-projects/spring-kafka/issues/117) - merge `handleManualAcks` - simplify offset handling by removing the `manualOffsets` map - ensure that all acks are flushed on `stop()` - Upgrade to Gradle 2.14 --- gradle/wrapper/gradle-wrapper.jar | Bin 53556 -> 53319 bytes gradle/wrapper/gradle-wrapper.properties | 4 +- .../KafkaMessageListenerContainer.java | 80 ++++++++++-------- .../KafkaMessageListenerContainerTests.java | 61 +++++++++++-- 4 files changed, 100 insertions(+), 45 deletions(-) diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index ca78035ef0501d802d4fc55381ef2d5c3ce0ec6e..d3b83982b9b1bccad955349d702be9b884c6e049 100644 GIT binary patch delta 3698 zcmZWrc|4Tc8-ESc=rZ<^waLD3kz7lQEfGo9$dZw5$QCYXF1pGR@|Gl&B5SspC=nWC z%w%h$1sRmBvXpGa@11wT#1Jm-1NnR7^Y*r~VK(Y99X9KQhoCnsQN z*>eRg&j}fe9oix#R}5T)a_7S4!`Og-Ia&~>REnKxx)fovq{%d(G8=M4{^ZA1SS)~52-CqE&HoXZUX=@9srOB8CYSkKt!0} zAwIyrp>A=)r7}p>+rZPp=ZK&bTlr8&ko9P`O1%KbRTYYju`o$^=F+vg$908HF^{Zg zRg7LVF2#ZUR|flzev92Gt^e@$q}A<9@+${tyJK_re#@Wjy4kR?-aE5_2;tdCV;H8H zT&<$owwkA9DUs2l1y0AVezA6OR<_h?CwO++xD@TUQ{xkP%9cToG)uav983w3aMi*; zBTUTID(>4!R+yuS?E7nn*(xcexAW&)5u35_-RImXK^Tuqqp4c5b9O==0v|P=&0(~@ zJ6w4pOd0T5Hy8SlHtMQ%PW%J^gvO!yA;l*O3LcL0ahUzKA^1j8)eZ$~>VxhFDS?D+ zRpg2OI_}szr3UE?MZL3soX3a71f2Ev3OK(=#|HD>7Yf zvsGx8kIb3U!rX?!t}mXV*oNSO&;Lj+OQx)r$sk>OcU+dE)>L2BH6!o!&fcAI(=1V5 zIwLoG%H6K7K*G(DFIVwnS!BdJQrZnV;Zv@c#&n>W$xp6jpCMfzNqo{E8Z2Eav5J%$ zdrTj!TMTkx+|XupPLi) zTygIA-%wNE|YF`sf*H2s;$R-2=`7NOf#q6kjnVAp7_?!o?szBPvk02X%6m`SBdjL zU+&zFz8h^%Mui@usEEnWnV<*eNJ%A{6;i4G?&Hnbdwz08=@_E&of<_c(G=BBXjgKE z*@VKfk|@n@`9`Jrd^oy=Z|eHPyCXjhszYk$RWL36nu#vM(}UwO%IEJg2o1~mDIWrt z+1E=MN0*5S>b!5{yL~#kjr+qcC){35RZJVLPVhU_HZOd`vG3)Z{A%Z_NB*Bzh*5X! zl|BhN(yZ|{nu7Nnsvd4{(J^oTQIVGO^ut^fI%+;QsoeCO)8JX%y$8^H2E7*&dpc5` zytKt>H|tXfe$gidG*eFY7vKo%rxJ>UC)fV(FJgz1BsKx0+(co>pjW1dY?MG`>WC3zM*9Gp{(oAQ=1br^v7u8 zuEYTfE;Yo0CRI`w*%DZA?79dUp zV*#9~@3KQgfrsap$qpI9LjXBDQI0-bwZFzUe>5X5Gwf0#KT=@kXj za_L$<2e8J`eAqihE0)We9eSzp^R2C8uY`tjL@bIuOkBxy{-=oj#l{!@;QUok3_gkU zJS-b8w@Y4A^vzEVqq4j+B`+LE1AO%2qyWy|qNJC5Ox2Sj3#&Q_9?!*izDU>GnV_6b zU@ndw(u=sLXCF#R>Pcm+HBx^}bu3NAys#a!#qFRv*$pwiTA)uFEQZAJF>-Kxq^tAi~^~~(!HOs)sU3&zJ7Pi~*-ODFO zXW3oIJ(q{rO?UfCkHA}Wsirwaa8F*Izv*Zq_D9>Sj@+DtVAL=vSQhQ(I(QRUGg3Jt04mOr_* zUvDJ&u5ce;uZkPI94c`L<^2^`w>Gt+8priyNo$=mM(bk6oGAgda)XMf%`0xYB&U&k zMSqoFHFom-xEychKC2+5hN_64cno{S;_yMeW-xg6{^$_eebim&= za8^_kI)Xkw(g;gD=T-a z(7LQ2Apa9s2~r+WnB$0ml*s_7=7}YA*77{v#Rv2?x12ilRnb_~dC<-6J1TvK|FETh z%&XafoaK(|>2+Pbc4h{z)x-{2=i$Qg;CZgeQ-ug7y`` z`h-d9f#i#KGOp>yJ=m$Orc8E?R10d(vefYM!vSjN4<-Jtdp~Q#| zUY8Ke0XX741p{5U4HzKy_)f6~jR7DVJY|c=a`H1*BM~TE+l>H#_s|1lFIY&F&GOqu zERUFA@vlZ1u=6l!(lN9XSx5l5fd*?5Fj+V*;;)!eBP@H&O(l* zRj~YQq?Zc7Q{{#8ETo`lgf)mSspo*F^efrf;lY9GIo6bR-AR@oka&>A8EEKZA?1(k zSsb<|kR8p;kavUZBm}iZ04!{gm)UIS$N#pG4I#w7`K$vaTXi7Hl?1&TrT@`DmJ}~!^LJPjxJD6z$pO0pGD&Oz z;K&XDdRxff?O}3(2paTo5CQ!=xM4w-x23o!CjeOS0)W&O!MYpBEr^5$uS zh%#HqjvnAG4>@o9 zO$-8a0pQ4={NMHgK+s6=4H{$8N*S{oOlZrf0V}V&E<_mn_;g zu(F#Kw+%uY??L@BV(w~~sN5rt2(N&$Q{P-&eQQm44wOzgv}?Ck^N%X9u;=e;LOOgc z6aeZ%9WT=zycCn?!k?z0s+{8j0K+Xy>8gVhOzLHIyQWv1jSuYVWmP~8*e&F~C*ZF> Kt!+{*O!R*iz94e| delta 4032 zcma)9dpwle8vYD}ouOO?nPG?+cQaHX+EMOuCrN}5!nmd|8^w+yeQdRdNGK7xq#882 zjLAJIaw#E~L`HUW<$Uwa-a2#6`Qyy*H@|m%&-Xshds%DETEi&dt}Wyiw6)~s*#rRm z{6I9}YNntfKV;1K7Io2?`mSP9%VA4lT)5!;bF6?#WJ7@*Eq}82qD4| zFmRS<1_pwWMKDl<<`?CK5mlifHo_`DQCXzcm$fE` ziQ-q|co13NcqI-ix7KpSzs*2A8gh55Q|VvfYuvSX;&s<9?J>&z^|js~ySQ@OXcM}r z%`&-kdn|8ot8$~Pv0d^M-fF1Mr%HsLMrtM7U6OWFP~vr&w#V9yW-Yi8%T21p8T(=t z(la{9eoOW!T-wAWDHf4`VcdZfE0v!~_d7t9rqB4S#bO!EYLv(C6KQs_%E_9HzH^nW zOeedwKT#eMl(8-ED1=zc=@14pPx29lvC52VA}J-~TA9YXW0X?sHwj%hly;c(gdBQg zt2wZ zw!o0Gl$O8@dS1HLit)lK+FA9F2;?_KMMgeWCik`8y-Lr(FNKDavn$5~U3Fr$le6QR z@pC%DF{4_7tA&Yej;rN~PYbed4wKh}Tb?s4It8c(chtoqmo(q_?HIWfd3S4A4yy12 zwrbvJAUeD8c%Z5(H81b!XmPyBJu7QUx=>LRRXpUI0JUBlw_JLd_w&eGvULT!zg|&1e_`sxfCMF|(@V0BmfC?kf_z3sdmm^z zHFxL5841!rX>6^Jgl1rm$oA={IWiTK#UI8SJE@cY$har?+4Qo47MHNU=$B7E2psBL zAUo`BxI4W`A5$TOx0Rf{e$qL`MdDMo>|S9;cvJK6*Xl?I+|kI${-tMFV&go%EBUf_ zPh!Wn-S1JCC{#+`)vg7R>Z0&AvPN*`DF;fM%b_SFNlM!DT*G|yhieyAz8Txw%H&pC ztQ2Zce7*_^eQN&&EfH!*kJ%$b&u~OdGE}a$>_CT~I!t$voILP4*p+U>Oie|qeY6-X zl*DhPHw2e`E}i|3+UsXhtr9O+Ch)m9)#LnMfenRky6XKNy!>n}Msp%0+$6hF!GkLb zmV-wQ5FUjZdqmv$oR>N_xw+x<4dV=xB~vuHIp3*#(g{s&@<7*~GX3M7hQ6m*SV@@# z&!@n`P_9aWiR#3p$SVZ}cc}WVM0ao3u@%2>1x_l{V9AtXj@f9@v5KtGBAYzJD)(b1 z_|cXz$5BRdD$Ua5Lt6dxFfX}uLa@O+j3+NcqNde%FlX01Ugp;46tP*vntAXh?Vj!C zjYZ=MmeT?yk!hZnLvp;3H(!riZ>Ks&tMAx6Q2T%@BUHVVDpu)qjp;8f?BaWL-=aSC z4HvD*{o#OPs9b#jHALFDW)11p@{zzRfHNk)del(E7+k*FI8rv^P?JQyOu@?!s#DH7 zFcb7Lk4l$Lh5j&Hdb)nXH-54`CS!V=_nyWxCeaJYgm=VvV^lGw}f%(13bOU?b1YCGCVa%U|a{M-p85^p~j&W%Zzy z@s`pfakP&CJoC>y)J`0@Np5@Xn^-wt-v8(1TxYz9Qc#*$=wK8VF>UhuYrT$;dXw6T zAE!{|DlURgLgRu*s5yJ1oI3~~s`|pd%erKQI_swNqw$reZ{sE;XHXHct&27B7Pl+U za-qV+xs4MNlx`_gNrX#14e80Zv*XSp6;4d)t`zjwM7$gnQai73mxtV{b@I#3bxRw& zp8Ne*iBhWk7fa5l@MjdC(MIp@Ik$659!T>-&ky|?Tal<(@tnA#xI*$5B;mcu$nKeD z3Y9NvRxxV!w7tzITOV$|nP+##2h5P_@7>00<(dg}TWS^V1oqe;e0@nyzAi@f^GUBa zE$q}FWgKa;d{QMYxF?bDLA68j4+;YRqytECTZ7oQu{Rt#o+(6NkaGtOSI3AwH}EoZyOoVmkRJAt&> zDXGCPVXS_SlF~PkldH_9cyPdQpx@1e!JFQF?)NDJv;xH@ixeA2j^K6=KQCI|*Uoos zYEms~CcXSqIQCajf)1#crxqpAss6c#_=ta)a9;VEzn5S2lg_ob<4f z-q~1Nu2=eG>BZ$kjFjwqv0@$ToX2xb3C>v*K2_I=C?q@((iIng2TI1Po8W=m!B_0F zn9GTeeHIJ2IPnXzj^M$(R~~i%0N8*hJycFO^mN-OcbhwznOk>ub*i;@^-kKaboV-h zs^RUmh)!BsHAJ0SBGEZCN>Ip7i>ReDseP{ELx$w=h<}lT zk-lT4S-=*1S^r<24D(C$Ftt2hm|FM#$IMxpr zLX3Ok!U}59bT)3RU=U4@wY}yTQ+|nl0eHv2i^Gby`f}H}NePL>mqyJVt|eTeiYb7X;z7VQ8&IQI@r* z%m(f&^g}tzO9B8MYRNyJ+5`<AK}ULTLba9r*DO(mLS~U^bQQ-N+Ot1whip=5P|t=`*sAyLVN*;ug4K1G!j zCjo{YLD0u^2-*hz=wwTIY&Lv$1FB6Ks!f|iD(^bXweOOFW19B)&R>QcbD`Qe9W#sm z&Y2g)7NOK^Er0H7RzprHPB?n<0DzPL0O)f>h`0$YBy%`3gKi8$y&Ni1V`EP19^VbW z0tLJcO%vywYFBOqd{iO6F|SHIW~i-DSNvGza0F7Uh6q?ttrG(WbM3((l51cMff|hJ ze extends AbstractMessageListener private final Consumer consumer; - private final ConcurrentMap> manualOffsets = new ConcurrentHashMap<>(); - private final Map> offsets = new HashMap<>(); private final MessageListener listener; @@ -309,12 +305,17 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener public void onPartitionsAssigned(Collection partitions) { ListenerConsumer.this.assignedPartitions = partitions; if (!ListenerConsumer.this.autoCommit) { - // Commit initial positions - while this is generally redundant + // Commit initial positions - this is generally redundant but // it protects us from the case when another consumer starts + // and rebalance would cause it to reset at the end + // see https://github.com/spring-projects/spring-kafka/issues/110 Map offsets = new HashMap<>(); for (TopicPartition partition : partitions) { offsets.put(partition, new OffsetAndMetadata(consumer.position(partition))); } + if (ListenerConsumer.this.logger.isDebugEnabled()) { + ListenerConsumer.this.logger.debug("Committing: " + offsets); + } if (KafkaMessageListenerContainer.this.getContainerProperties().isSyncCommits()) { ListenerConsumer.this.consumer.commitSync(offsets); } @@ -398,7 +399,6 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener if (this.containerProperties.getIdleEventInterval() != null) { lastReceive = System.currentTimeMillis(); } - handleManualAcks(); // if the container is set to auto-commit, then execute in the // same thread // otherwise send to the buffering queue @@ -487,8 +487,6 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener finally { this.listenerInvokerFuture = null; } - // handle the last manual acks, after the listeners have closed - handleManualAcks(); processCommits(); if (this.offsets.size() > 0) { // we always commit after stopping the invoker @@ -551,7 +549,7 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } } else { - updateManualOffset(record); + addOffset(record); } } @@ -605,6 +603,7 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } private void processCommits() { + handleManualAcks(); this.count += this.acks.size(); long now; AckMode ackMode = this.containerProperties.getAckMode(); @@ -614,6 +613,10 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } boolean countExceeded = this.count >= this.containerProperties.getAckCount(); if (ackMode.equals(AckMode.BATCH) || ackMode.equals(AckMode.COUNT) && countExceeded) { + if (this.logger.isDebugEnabled()) { + this.logger.debug("Committing in AckMode.COUNT because count " + this.count + + " exceeds configured limit of" + this.containerProperties.getAckCount()); + } commitIfNecessary(); this.count = 0; } @@ -621,11 +624,27 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener now = System.currentTimeMillis(); boolean elapsed = now - this.last > this.containerProperties.getAckTime(); if (ackMode.equals(AckMode.TIME) && elapsed) { + if (this.logger.isDebugEnabled()) { + this.logger + .debug("Committing in AckMode.TIME because time elapsed exceeds configured limit of " + + this.containerProperties.getAckTime()); + } commitIfNecessary(); this.last = now; } - else if ((ackMode.equals(AckMode.COUNT_TIME) || this.isManualAck) - && (elapsed || countExceeded)) { + else if ((ackMode.equals(AckMode.COUNT_TIME) || this.isManualAck) && (elapsed || countExceeded)) { + if (this.logger.isDebugEnabled()) { + if (elapsed) { + this.logger.debug("Committing in AckMode." + ackMode.name() + " because time elapsed " + + "exceeds configured limit of " + this.containerProperties.getAckTime()); + } + else { + this.logger.debug("Committing in AckMode." + ackMode.name() + " because count " + + this.count + " exceeds configured limit of" + + this.containerProperties.getAckCount()); + } + } + commitIfNecessary(); this.last = now; this.count = 0; @@ -674,39 +693,22 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener this.offsets.get(record.topic()).put(record.partition(), record.offset()); } - private void updateManualOffset(ConsumerRecord record) { - if (!this.manualOffsets.containsKey(record.topic())) { - this.manualOffsets.putIfAbsent(record.topic(), new ConcurrentHashMap()); - } - this.manualOffsets.get(record.topic()).put(record.partition(), record.offset()); - } - private void commitIfNecessary() { Map commits = new HashMap<>(); - if (this.isManualAck) { - for (Entry> entry : this.manualOffsets.entrySet()) { - Iterator> iterator = entry.getValue().entrySet().iterator(); - while (iterator.hasNext()) { - Entry offset = iterator.next(); - commits.put(new TopicPartition(entry.getKey(), offset.getKey()), - new OffsetAndMetadata(offset.getValue() + 1)); - iterator.remove(); - } - } - } - else { - for (Entry> entry : this.offsets.entrySet()) { - for (Entry offset : entry.getValue().entrySet()) { - commits.put(new TopicPartition(entry.getKey(), offset.getKey()), - new OffsetAndMetadata(offset.getValue() + 1)); - } + for (Entry> entry : this.offsets.entrySet()) { + for (Entry offset : entry.getValue().entrySet()) { + commits.put(new TopicPartition(entry.getKey(), offset.getKey()), + new OffsetAndMetadata(offset.getValue() + 1)); } } this.offsets.clear(); if (this.logger.isDebugEnabled()) { - this.logger.debug("Committing: " + commits); + this.logger.debug("Commit list: " + commits); } if (!commits.isEmpty()) { + if (this.logger.isDebugEnabled()) { + this.logger.debug("Committing: " + commits); + } try { if (this.containerProperties.isSyncCommits()) { this.consumer.commitSync(commits); @@ -717,6 +719,9 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } catch (WakeupException e) { // ignore - not polling + if (this.logger.isDebugEnabled()) { + this.logger.debug("Woken up during commit"); + } } } } @@ -793,6 +798,9 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } Thread.currentThread().interrupt(); } + if (ListenerConsumer.this.logger.isDebugEnabled()) { + ListenerConsumer.this.logger.debug("Invoker stopped"); + } } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 30a761fc..c9b6445f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -18,9 +18,10 @@ package org.springframework.kafka.listener; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.util.BitSet; @@ -207,7 +208,7 @@ public class KafkaMessageListenerContainerTests { ContainerProperties containerProps = new ContainerProperties(topic5); containerProps.setAckCount(1); containerProps.setPauseAfter(100); - containerProps.setAckMode(AckMode.MANUAL); + containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE_SYNC); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); final CountDownLatch latch = new CountDownLatch(3); @@ -232,22 +233,68 @@ public class KafkaMessageListenerContainerTests { ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(topic5); - template.sendDefault(0, "foo"); - template.sendDefault(2, "bar"); + template.sendDefault(0, 0, "foo"); + template.sendDefault(1, 2, "bar"); template.flush(); Thread.sleep(300); - template.sendDefault(0, "fiz"); - template.sendDefault(2, "buz"); + template.sendDefault(0, 0, "fiz"); + template.sendDefault(1, 2, "buz"); template.flush(); // Verify that commitSync is called when paused assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); verify(consumer, atLeastOnce()).pause(any(TopicPartition.class), any(TopicPartition.class)); - verify(consumer, atMost(2)).commitSync(any()); + verify(consumer, atLeast(2)).commitSync(any()); verify(consumer, atLeastOnce()).resume(any(TopicPartition.class), any(TopicPartition.class)); container.stop(); } + @Test + public void testCommitsAreFlushedOnStop() throws Exception { + Map props = KafkaTestUtils.consumerProps("flushedOnStop", "false", embeddedKafka); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); + ContainerProperties containerProps = new ContainerProperties(topic5); + containerProps.setAckCount(1); + containerProps.setPauseAfter(100); + // set large values, ensuring that commits don't happen before `stop()` + containerProps.setAckTime(20000); + containerProps.setAckCount(20000); + containerProps.setAckMode(AckMode.COUNT_TIME); + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); + final CountDownLatch latch = new CountDownLatch(4); + containerProps.setMessageListener((AcknowledgingMessageListener) (message, ack) -> { + logger.info("slow: " + message); + ack.acknowledge(); + latch.countDown(); + }); + container.setBeanName("testManualFlushed"); + + container.start(); + Consumer consumer = spyOnConsumer(container); + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); + + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(topic5); + template.sendDefault(0, 0, "foo"); + template.sendDefault(1, 2, "bar"); + template.flush(); + Thread.sleep(300); + template.sendDefault(0, 0, "fiz"); + template.sendDefault(1, 2, "buz"); + template.flush(); + + // Verify that commitSync is called when paused + assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); + // Verify that just the initial commit is processed before stop + verify(consumer, times(1)).commitSync(any()); + container.stop(); + // Verify that a commit has been made on stop + verify(consumer, times(2)).commitSync(any()); + } + @Test public void testSlowConsumerWithException() throws Exception { logger.info("Start " + this.testName.getMethodName());