From 41b389e574c2b48278c6602b2016c2a39b5905ff Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 19 Oct 2017 13:43:06 -0400 Subject: [PATCH] Move Kinesis batch checkpoint before sending Since the listener may take some time for records processing, there is a possibility that checkpoint will be stored too late after the process and thus we are able to get the same records in different channel adapter for the same shard, even if they are in the same consumer group and use shared `MetadataStore` This solution is some compromise for the current state of things and has to be reconsidered in the future in favor of proper rebalance and shard leader election solution As a workaround for the duplicate records an additional `@IdempotentReceiver` approach can be used * Upgrade to Gradle 4.2.1, Checkstyle 8.3, AssertJ 3.8.0 * Fix race condition in the `KinesisMessageDrivenChannelAdapterTests` --- .gitignore | 1 + build.gradle | 8 ++++---- gradle/wrapper/gradle-wrapper.jar | Bin 54708 -> 54712 bytes gradle/wrapper/gradle-wrapper.properties | 2 +- .../KinesisMessageDrivenChannelAdapter.java | 11 +++++++---- ...nesisMessageDrivenChannelAdapterTests.java | 2 ++ 6 files changed, 15 insertions(+), 9 deletions(-) diff --git a/.gitignore b/.gitignore index 27c4191..fbb437a 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ target /*.ipr /*.iws bin/ +out diff --git a/build.gradle b/build.gradle index 095e92e..6d3cc8b 100644 --- a/build.gradle +++ b/build.gradle @@ -3,7 +3,7 @@ buildscript { maven { url 'http://repo.spring.io/plugins-release' } } dependencies { - classpath 'io.spring.gradle:dependency-management-plugin:1.0.2.RELEASE' + classpath 'io.spring.gradle:dependency-management-plugin:1.0.3.RELEASE' classpath 'io.spring.gradle:spring-io-plugin:0.0.8.RELEASE' } } @@ -31,11 +31,11 @@ repositories { } ext { - assertjVersion = '3.6.2' + assertjVersion = '3.8.0' servletApiVersion = '3.1.0' slf4jVersion = '1.7.25' springCloudAwsVersion = '1.2.1.RELEASE' - springIntegrationVersion = '4.3.11.RELEASE' + springIntegrationVersion = '4.3.12.RELEASE' idPrefix = 'aws' @@ -96,7 +96,7 @@ jacoco { checkstyle { configFile = file("${rootDir}/src/checkstyle/checkstyle.xml") - toolVersion = "8.1" + toolVersion = "8.3" } dependencies { diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 7a3265ee94c0ab25cf079ac8ccdf87f41d455d42..ed88a042a287c140a32e1639edfc91b2a233da8c 100644 GIT binary patch delta 795 zcmYk4T}YEr7{}l9IOewecvrq|zRp&Pu)Rae%932Deb8trl2#f~U1)|>mXMGjAzer% z&iqrBDUui62!${~D9~`z+FbKvc2ief^yN(!L>FCjJZkP-{r}JJd7g95bB=#FVQe^| zd!KQ6Puiz4Ns><8FReCw%lO&6+{~nIb;N&j(mcOgCsleA4KI|~35Dlufje+)qXND_ zm7$-C zNCU=IXTZy;owQ_Gcc$r5`e0pgFlB6mGbH1oT~6Y=uC2Rv0WW0hF>X)8$7ziUve!Zu zJc`J0<;CaQ^8~EOOGO87rsT&%W4?ez`K!=b7!R`w1w3AuEGqAL;^8fifX_WiXnL#B zW-qtdsPJ0F5gg_6ru73$lC39HMV@L=&=@*Oj#?q#gbtJLtdKe35;E7rRiBGHVU1mb zKkR0MSPq|K8Y*WlQS>sNw%G7~ri|*Z3i&r?M|DJ{V3V+&k-hZf2A4Vb5-FgL7A_Cq z^gE5RTDf%Kd}}hsxHY$lq>8poajRWXl}@&cP)~a=Cl~ zP~a;;g!9j{Dl>u2)sgi94?593S4>K;kTtzYqOGnkepr7V3s~Hj&Nt9#zF)LW9We8L z8aW4rwJmt4VF>L*4siH;&A#wR+e7Y%>Eil69rB*$v$$2d$AZgkDa@W)gZKs0ud hdM7b5xg9l&a_0Yk%%8%#@f=)z#qC9x{!m~g_z#MO4vPQ) delta 660 zcmYL{T}V@59LCT4H#Q?>XANaLQ<7DtN0!d8mPQ(d#Y~%QMV3|&tMxU>NJ!X)MC3)G z_AvS;@^)P$R0w^LIgf76wz=)XuIk3JAY2z+_x7^#T>XB}bDkFt@0kb}ya*SJ>{#m@ zU2-)`lH?t2@#=o{<@%U3qKv<~Hf2~Nx!d%GuJFnw6Yq^^iMUqZ1|lBz0AC{x_70Y5 zCoN0JX(jMat5n7W9%2fr<1)_Z=GD{?PCPv++i~KVE1jt z?KzbNFfq4_9{cdxw+)7@jGb|Xs{C9nIO*RcqA?!LU&LUb44>kA=yZT%jcAU}D>T)M zFTGW0$;f!d_5M{{9^8l@^A5VvFHq}OaQvf6)&Zg21MBgIt05@V9klXR#`c85Jm^eZ zq1<&*EGJr-33bVHR5A?5-D-3XmtuEX#`mN`g?B_$n`)%ekiedhQM4~p3ZErYr`T^e z7S2=hkvQ%BLKAIyBCsNTfP7;dZMc=LV=~^RJ?tBdnJ)Gd7cyt*!z4aWHt`QsSqP2U Zdh7q6h+m|6^)fjv;byi**Btp9{sG{o{Gk8< diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index f16d266..74bb778 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -2,4 +2,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-4.2.1-bin.zip diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java index 235e491..02d77ac 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java @@ -885,6 +885,12 @@ public class KinesisMessageDrivenChannelAdapter extends MessageProducerSupport i if (logger.isTraceEnabled()) { logger.trace("Processing records: " + records + " for [" + ShardConsumer.this + "]"); } + + // TODO Reconsider this logic after rebalance and shard leader election implementation + if (CheckpointMode.batch.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) { + this.checkpointer.checkpoint(); + } + switch (KinesisMessageDrivenChannelAdapter.this.listenerMode) { case record: for (Record record : records) { @@ -909,6 +915,7 @@ public class KinesisMessageDrivenChannelAdapter extends MessageProducerSupport i this.checkpointer.checkpoint(record.getSequenceNumber()); } } + break; case batch: @@ -925,10 +932,6 @@ public class KinesisMessageDrivenChannelAdapter extends MessageProducerSupport i break; } - - if (CheckpointMode.batch.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) { - this.checkpointer.checkpoint(); - } } } diff --git a/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java index b67b739..252cbfc 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java @@ -205,6 +205,8 @@ public class KinesisMessageDrivenChannelAdapterTests { // When resharding happens the describeStream() is performed again verify(this.amazonKinesisForResharding, atLeast(2)).describeStream(any(DescribeStreamRequest.class)); + + this.reshardingChannelAdapter.stop(); } @Configuration