From 1a01c6cbcc362be46ecb0990a3d7828bba402446 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Thu, 27 Mar 2025 21:59:12 +0100 Subject: [PATCH] GH-3105 Fix hashCode calculation for StreamBridge function This would ensure that there are no collisions in hashCode calculation for cases where partitionKeyExpression is used in one of the bindings while partitionCount of any of the bindings may result in teh overall StreamBridge function having the same hashCode Resolves #3105 --- .../stream/function/StreamBridgeTests.java | 29 ++++++++++++++++++- .../cloud/stream/function/StreamBridge.java | 4 +++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java index 06aad0192..014fa751b 100644 --- a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java +++ b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java @@ -276,7 +276,7 @@ class StreamBridgeTests { @SuppressWarnings("rawtypes") @Test - void test_2785() throws Exception { + void test_2783() throws Exception { try (ConfigurableApplicationContext context = new SpringApplicationBuilder( TestChannelBinderConfiguration.getCompleteConfiguration( EmptyConfiguration.class)).web(WebApplicationType.NONE).run( @@ -304,6 +304,33 @@ class StreamBridgeTests { } } + @Test + void test_3105() throws Exception { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + TestChannelBinderConfiguration.getCompleteConfiguration( + EmptyConfiguration.class)).web(WebApplicationType.NONE).run( + "--spring.cloud.stream.source=outputA;outputB", + "--spring.cloud.stream.bindings.outputA-out-0.producer.partition-count=2", + "--spring.cloud.stream.bindings.outputA-out-0.content-type=application/json", + + "--spring.cloud.stream.bindings.outputB-out-0.producer.partition-count=8", + "--spring.cloud.stream.bindings.outputB-out-0.content-type=application/json", + "--spring.cloud.stream.bindings.outputB-out-0.producer.partition-key-expression=headers['partitionKey']", + + "--spring.jmx.enabled=false")) { + StreamBridge streamBridge = context.getBean(StreamBridge.class); + Field field = ReflectionUtils.findField(StreamBridge.class, "streamBridgeFunctionCache"); + Objects.requireNonNull(field).setAccessible(true); + + + streamBridge.send("outputB-out-0", MessageBuilder.withPayload("outputB").setHeader("partitionKey", "oleg").build()); + streamBridge.send("outputA-out-0", MessageBuilder.withPayload("outputA").build()); + /* + * Nothing to assert other than this test should not fail due to the cache collision described in GH-3105 + */ + } + } + /* * This test verifies that when a partition key expression is set, then scst_partition is always set, even in * concurrent scenarios. diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java index f8cd90f5c..c256080db 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java @@ -240,6 +240,10 @@ public final class StreamBridge implements StreamOperations, SmartInitializingSi + Boolean.hashCode(producerProperties.isPartitioned()) + producerProperties.getPartitionCount(); + if (producerProperties.getPartitionKeyExpression() != null && producerProperties.getBindingName() != null) { + hash += producerProperties.getBindingName().hashCode(); + } + return hash; }