GH-3105 Fix hashCode calculation for StreamBridge function

This would ensure that there are no collisions in hashCode calculation for cases where partitionKeyExpression is used in one of the bindings while partitionCount of any of the bindings may result in teh overall StreamBridge function having the same  hashCode

Resolves #3105
This commit is contained in:
Oleg Zhurakousky
2025-03-27 21:59:12 +01:00
parent 1fd331f5bf
commit 1a01c6cbcc
2 changed files with 32 additions and 1 deletions

View File

@@ -276,7 +276,7 @@ class StreamBridgeTests {
@SuppressWarnings("rawtypes")
@Test
void test_2785() throws Exception {
void test_2783() throws Exception {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
EmptyConfiguration.class)).web(WebApplicationType.NONE).run(
@@ -304,6 +304,33 @@ class StreamBridgeTests {
}
}
@Test
void test_3105() throws Exception {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
EmptyConfiguration.class)).web(WebApplicationType.NONE).run(
"--spring.cloud.stream.source=outputA;outputB",
"--spring.cloud.stream.bindings.outputA-out-0.producer.partition-count=2",
"--spring.cloud.stream.bindings.outputA-out-0.content-type=application/json",
"--spring.cloud.stream.bindings.outputB-out-0.producer.partition-count=8",
"--spring.cloud.stream.bindings.outputB-out-0.content-type=application/json",
"--spring.cloud.stream.bindings.outputB-out-0.producer.partition-key-expression=headers['partitionKey']",
"--spring.jmx.enabled=false")) {
StreamBridge streamBridge = context.getBean(StreamBridge.class);
Field field = ReflectionUtils.findField(StreamBridge.class, "streamBridgeFunctionCache");
Objects.requireNonNull(field).setAccessible(true);
streamBridge.send("outputB-out-0", MessageBuilder.withPayload("outputB").setHeader("partitionKey", "oleg").build());
streamBridge.send("outputA-out-0", MessageBuilder.withPayload("outputA").build());
/*
* Nothing to assert other than this test should not fail due to the cache collision described in GH-3105
*/
}
}
/*
* This test verifies that when a partition key expression is set, then scst_partition is always set, even in
* concurrent scenarios.

View File

@@ -240,6 +240,10 @@ public final class StreamBridge implements StreamOperations, SmartInitializingSi
+ Boolean.hashCode(producerProperties.isPartitioned())
+ producerProperties.getPartitionCount();
if (producerProperties.getPartitionKeyExpression() != null && producerProperties.getBindingName() != null) {
hash += producerProperties.getBindingName().hashCode();
}
return hash;
}