Move Kinesis batch checkpoint before sending
Since the listener may take some time for records processing, there is a possibility that checkpoint will be stored too late after the process and thus we are able to get the same records in different channel adapter for the same shard, even if they are in the same consumer group and use shared `MetadataStore` This solution is some compromise for the current state of things and has to be reconsidered in the future in favor of proper rebalance and shard leader election solution As a workaround for the duplicate records an additional `@IdempotentReceiver` approach can be used * Upgrade to Gradle 4.2.1, Checkstyle 8.3, AssertJ 3.8.0 * Fix race condition in the `KinesisMessageDrivenChannelAdapterTests`
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -10,3 +10,4 @@ target
|
||||
/*.ipr
|
||||
/*.iws
|
||||
bin/
|
||||
out
|
||||
|
||||
@@ -3,7 +3,7 @@ buildscript {
|
||||
maven { url 'http://repo.spring.io/plugins-release' }
|
||||
}
|
||||
dependencies {
|
||||
classpath 'io.spring.gradle:dependency-management-plugin:1.0.2.RELEASE'
|
||||
classpath 'io.spring.gradle:dependency-management-plugin:1.0.3.RELEASE'
|
||||
classpath 'io.spring.gradle:spring-io-plugin:0.0.8.RELEASE'
|
||||
}
|
||||
}
|
||||
@@ -31,11 +31,11 @@ repositories {
|
||||
}
|
||||
|
||||
ext {
|
||||
assertjVersion = '3.6.2'
|
||||
assertjVersion = '3.8.0'
|
||||
servletApiVersion = '3.1.0'
|
||||
slf4jVersion = '1.7.25'
|
||||
springCloudAwsVersion = '1.2.1.RELEASE'
|
||||
springIntegrationVersion = '4.3.11.RELEASE'
|
||||
springIntegrationVersion = '4.3.12.RELEASE'
|
||||
|
||||
idPrefix = 'aws'
|
||||
|
||||
@@ -96,7 +96,7 @@ jacoco {
|
||||
|
||||
checkstyle {
|
||||
configFile = file("${rootDir}/src/checkstyle/checkstyle.xml")
|
||||
toolVersion = "8.1"
|
||||
toolVersion = "8.3"
|
||||
}
|
||||
|
||||
dependencies {
|
||||
|
||||
BIN
gradle/wrapper/gradle-wrapper.jar
vendored
BIN
gradle/wrapper/gradle-wrapper.jar
vendored
Binary file not shown.
2
gradle/wrapper/gradle-wrapper.properties
vendored
2
gradle/wrapper/gradle-wrapper.properties
vendored
@@ -2,4 +2,4 @@ distributionBase=GRADLE_USER_HOME
|
||||
distributionPath=wrapper/dists
|
||||
zipStoreBase=GRADLE_USER_HOME
|
||||
zipStorePath=wrapper/dists
|
||||
distributionUrl=https\://services.gradle.org/distributions/gradle-4.1-bin.zip
|
||||
distributionUrl=https\://services.gradle.org/distributions/gradle-4.2.1-bin.zip
|
||||
|
||||
@@ -885,6 +885,12 @@ public class KinesisMessageDrivenChannelAdapter extends MessageProducerSupport i
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Processing records: " + records + " for [" + ShardConsumer.this + "]");
|
||||
}
|
||||
|
||||
// TODO Reconsider this logic after rebalance and shard leader election implementation
|
||||
if (CheckpointMode.batch.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
|
||||
this.checkpointer.checkpoint();
|
||||
}
|
||||
|
||||
switch (KinesisMessageDrivenChannelAdapter.this.listenerMode) {
|
||||
case record:
|
||||
for (Record record : records) {
|
||||
@@ -909,6 +915,7 @@ public class KinesisMessageDrivenChannelAdapter extends MessageProducerSupport i
|
||||
this.checkpointer.checkpoint(record.getSequenceNumber());
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case batch:
|
||||
@@ -925,10 +932,6 @@ public class KinesisMessageDrivenChannelAdapter extends MessageProducerSupport i
|
||||
break;
|
||||
|
||||
}
|
||||
|
||||
if (CheckpointMode.batch.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
|
||||
this.checkpointer.checkpoint();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -205,6 +205,8 @@ public class KinesisMessageDrivenChannelAdapterTests {
|
||||
|
||||
// When resharding happens the describeStream() is performed again
|
||||
verify(this.amazonKinesisForResharding, atLeast(2)).describeStream(any(DescribeStreamRequest.class));
|
||||
|
||||
this.reshardingChannelAdapter.stop();
|
||||
}
|
||||
|
||||
@Configuration
|
||||
|
||||
Reference in New Issue
Block a user