diff --git a/.gitignore b/.gitignore index 27c4191..fbb437a 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ target /*.ipr /*.iws bin/ +out diff --git a/build.gradle b/build.gradle index 095e92e..6d3cc8b 100644 --- a/build.gradle +++ b/build.gradle @@ -3,7 +3,7 @@ buildscript { maven { url 'http://repo.spring.io/plugins-release' } } dependencies { - classpath 'io.spring.gradle:dependency-management-plugin:1.0.2.RELEASE' + classpath 'io.spring.gradle:dependency-management-plugin:1.0.3.RELEASE' classpath 'io.spring.gradle:spring-io-plugin:0.0.8.RELEASE' } } @@ -31,11 +31,11 @@ repositories { } ext { - assertjVersion = '3.6.2' + assertjVersion = '3.8.0' servletApiVersion = '3.1.0' slf4jVersion = '1.7.25' springCloudAwsVersion = '1.2.1.RELEASE' - springIntegrationVersion = '4.3.11.RELEASE' + springIntegrationVersion = '4.3.12.RELEASE' idPrefix = 'aws' @@ -96,7 +96,7 @@ jacoco { checkstyle { configFile = file("${rootDir}/src/checkstyle/checkstyle.xml") - toolVersion = "8.1" + toolVersion = "8.3" } dependencies { diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 7a3265e..ed88a04 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index f16d266..74bb778 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -2,4 +2,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-4.2.1-bin.zip diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java index 235e491..02d77ac 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java @@ -885,6 +885,12 @@ public class KinesisMessageDrivenChannelAdapter extends MessageProducerSupport i if (logger.isTraceEnabled()) { logger.trace("Processing records: " + records + " for [" + ShardConsumer.this + "]"); } + + // TODO Reconsider this logic after rebalance and shard leader election implementation + if (CheckpointMode.batch.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) { + this.checkpointer.checkpoint(); + } + switch (KinesisMessageDrivenChannelAdapter.this.listenerMode) { case record: for (Record record : records) { @@ -909,6 +915,7 @@ public class KinesisMessageDrivenChannelAdapter extends MessageProducerSupport i this.checkpointer.checkpoint(record.getSequenceNumber()); } } + break; case batch: @@ -925,10 +932,6 @@ public class KinesisMessageDrivenChannelAdapter extends MessageProducerSupport i break; } - - if (CheckpointMode.batch.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) { - this.checkpointer.checkpoint(); - } } } diff --git a/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java index b67b739..252cbfc 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java @@ -205,6 +205,8 @@ public class KinesisMessageDrivenChannelAdapterTests { // When resharding happens the describeStream() is performed again verify(this.amazonKinesisForResharding, atLeast(2)).describeStream(any(DescribeStreamRequest.class)); + + this.reshardingChannelAdapter.stop(); } @Configuration