From 34acc2d190ee5798d8ebb3df4639969f75d84368 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 9 Mar 2017 18:00:55 -0500 Subject: [PATCH] Kafka GH-21: Don't Override User Partition See spring-cloud/spring-cloud-stream-binder-kafka#109 If the user app sets the `BinderHeader.PARTITION_HEADER`, don't override it in the binder. --- .../binding/MessageConverterConfigurer.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/MessageConverterConfigurer.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/MessageConverterConfigurer.java index 735863552..920b45fe0 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/MessageConverterConfigurer.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/MessageConverterConfigurer.java @@ -275,12 +275,16 @@ public class MessageConverterConfigurer implements MessageChannelConfigurer, Bea @Override public Message preSend(Message message, MessageChannel channel) { - int partition = this.partitionHandler.determinePartition(message); - return MessageConverterConfigurer.this.messageBuilderFactory - .fromMessage(message) - .setHeader(BinderHeaders.PARTITION_HEADER, partition) - .build(); - + if (!message.getHeaders().containsKey(BinderHeaders.PARTITION_HEADER)) { + int partition = this.partitionHandler.determinePartition(message); + return MessageConverterConfigurer.this.messageBuilderFactory + .fromMessage(message) + .setHeader(BinderHeaders.PARTITION_HEADER, partition) + .build(); + } + else { + return message; + } } }