GH-52: Bypass SCSt partitioning algorithm

Fixes: spring-cloud-stream-binder-aws-kinesis#52

The AWS Kinesis uses its own, server-side, algorithm to determine the
target shard for the record, therefore we don't need to use SCSt
partitioning functionality and just propagate a provided
`partitionKeyExpression` down to the `KinesisMessageHandler`
This commit is contained in:
Artem Bilan
2018-06-08 16:01:12 -04:00
parent ea8d5652f5
commit 6456e0ab9d
4 changed files with 48 additions and 28 deletions

View File

@@ -15,9 +15,9 @@
<packaging>pom</packaging>
<properties>
<spring-cloud-stream.version>2.0.0.RELEASE</spring-cloud-stream.version>
<spring-cloud-aws.version>2.0.0.M4</spring-cloud-aws.version>
<spring-integration-aws-version>2.0.0.M2</spring-integration-aws-version>
<spring-cloud-stream.version>2.0.1.BUILD-SNAPSHOT</spring-cloud-stream.version>
<spring-cloud-aws.version>2.0.0.RC2</spring-cloud-aws.version>
<spring-integration-aws-version>2.0.0.BUILD-SNAPSHOT</spring-integration-aws-version>
<java.version>1.8</java.version>
</properties>

View File

@@ -36,10 +36,12 @@ AWS Kinesis uses the partition key as input to a hash function that maps the par
Specifically, an MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards.
As a result of this hashing mechanism, all data records with the same partition key map to the same shard within the stream.
But at the same time we can't select target shard to send explicitly.
Although calculation the hash manually (and use `explicitHashKeyExpression` for producer, respectively), we may track the target shard by inclusion into its `HashKeyRange`.
Although calculating the hash manually (and use `explicitHashKeyExpression` for producer, respectively), we may track the target shard by inclusion into its `HashKeyRange`.
By default partition key is a result of the `Object.hash()` from the message `payload`.
The Spring Cloud Stream partition handling logic is excluded in case of AWS Kinesis Binder since it is out of use and the provided `producer.partitionKeyExpression` is propagated to the `KinesisMessageHandler` directly.
On the consumer side the `instanceCount` and `instanceIndex` are used to distribute shards between consumers in group evenly.
== Configuration Options

View File

@@ -38,13 +38,16 @@ import org.springframework.cloud.stream.binder.kinesis.properties.KinesisExtende
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisProducerProperties;
import org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisConsumerDestination;
import org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisStreamProvisioner;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.expression.Expression;
import org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter;
import org.springframework.integration.aws.inbound.kinesis.KinesisMessageHeaderErrorMessageStrategy;
import org.springframework.integration.aws.inbound.kinesis.KinesisShardOffset;
import org.springframework.integration.aws.inbound.kinesis.ListenerMode;
import org.springframework.integration.aws.outbound.KinesisMessageHandler;
import org.springframework.integration.channel.ChannelInterceptorAware;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.expression.FunctionExpression;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
@@ -52,6 +55,7 @@ import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
@@ -105,9 +109,6 @@ public class KinesisMessageChannelBinder extends
return this.extendedBindingProperties.getExtendedProducerProperties(channelName);
}
// below are the main methods to implement - these will create the message
// handlers used by the application
// to put and consume messages
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<KinesisProducerProperties> producerProperties, MessageChannel errorChannel) {
@@ -116,14 +117,14 @@ public class KinesisMessageChannelBinder extends
kinesisMessageHandler.setSync(producerProperties.getExtension().isSync());
kinesisMessageHandler.setSendTimeout(producerProperties.getExtension().getSendTimeout());
kinesisMessageHandler.setStream(destination.getName());
if (producerProperties.isPartitioned()) {
Expression partitionKeyExpression = producerProperties.getPartitionKeyExpression();
if (partitionKeyExpression != null) {
kinesisMessageHandler
.setPartitionKeyExpressionString(
"'partitionKey-' + headers['" + BinderHeaders.PARTITION_HEADER + "']");
.setPartitionKeyExpression(partitionKeyExpression);
}
else {
kinesisMessageHandler
.setPartitionKeyExpression(new FunctionExpression<Message<?>>(m -> m.getPayload().hashCode()));
kinesisMessageHandler.setPartitionKeyExpression(
new FunctionExpression<Message<?>>(m -> m.getPayload().hashCode()));
}
kinesisMessageHandler.setFailureChannel(errorChannel);
kinesisMessageHandler.setBeanFactory(getBeanFactory());
@@ -131,6 +132,22 @@ public class KinesisMessageChannelBinder extends
return kinesisMessageHandler;
}
@Override
protected void postProcessOutputChannel(MessageChannel outputChannel,
ExtendedProducerProperties<KinesisProducerProperties> producerProperties) {
if (outputChannel instanceof ChannelInterceptorAware) {
ChannelInterceptorAware channelInterceptorAware = (ChannelInterceptorAware) outputChannel;
List<ChannelInterceptor> channelInterceptors = channelInterceptorAware.getChannelInterceptors();
for (ChannelInterceptor channelInterceptor : channelInterceptors) {
if (channelInterceptor instanceof MessageConverterConfigurer.PartitioningInterceptor) {
channelInterceptorAware.removeInterceptor(channelInterceptor);
break;
}
}
}
}
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
ExtendedConsumerProperties<KinesisConsumerProperties> properties) {
@@ -179,7 +196,7 @@ public class KinesisMessageChannelBinder extends
}
else {
adapter = new KinesisMessageDrivenChannelAdapter(this.amazonKinesis,
shardOffsets.toArray(new KinesisShardOffset[shardOffsets.size()]));
shardOffsets.toArray(new KinesisShardOffset[0]));
}
boolean anonymous = !StringUtils.hasText(group);
@@ -196,18 +213,18 @@ public class KinesisMessageChannelBinder extends
switch (kinesisConsumerProperties.getListenerMode()) {
case record:
adapter.setListenerMode(ListenerMode.record);
break;
case record:
adapter.setListenerMode(ListenerMode.record);
break;
case batch:
adapter.setListenerMode(ListenerMode.batch);
break;
case batch:
adapter.setListenerMode(ListenerMode.batch);
break;
case rawRecords:
adapter.setListenerMode(ListenerMode.batch);
adapter.setConverter(null);
break;
case rawRecords:
adapter.setListenerMode(ListenerMode.batch);
adapter.setConverter(null);
break;
}

View File

@@ -317,8 +317,7 @@ public class KinesisBinderTests
consumerProperties);
ExtendedProducerProperties<KinesisProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionKeyExpression(spelExpressionParser.parseExpression("payload"));
producerProperties.setPartitionSelectorExpression(spelExpressionParser.parseExpression("hashCode()"));
producerProperties.setPartitionKeyExpression(spelExpressionParser.parseExpression("headers.partitionKey"));
producerProperties.setPartitionCount(3);
DirectChannel output = createBindableChannel("test.output",
@@ -333,12 +332,14 @@ public class KinesisBinderTests
}
Message<Integer> message2 = MessageBuilder.withPayload(2)
.setHeader("partitionKey", 2)
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "foo")
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 42)
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 43).build();
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 43)
.build();
output.send(message2);
output.send(new GenericMessage<>(1));
output.send(new GenericMessage<>(0));
output.send(MessageBuilder.withPayload(1).setHeader("partitionKey", 1).build());
output.send(MessageBuilder.withPayload(0).setHeader("partitionKey", 0).build());
assertThat(receiveLatch.await(20, TimeUnit.SECONDS)).isTrue();