diff --git a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java index 06aad0192..014fa751b 100644 --- a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java +++ b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java @@ -276,7 +276,7 @@ class StreamBridgeTests { @SuppressWarnings("rawtypes") @Test - void test_2785() throws Exception { + void test_2783() throws Exception { try (ConfigurableApplicationContext context = new SpringApplicationBuilder( TestChannelBinderConfiguration.getCompleteConfiguration( EmptyConfiguration.class)).web(WebApplicationType.NONE).run( @@ -304,6 +304,33 @@ class StreamBridgeTests { } } + @Test + void test_3105() throws Exception { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + TestChannelBinderConfiguration.getCompleteConfiguration( + EmptyConfiguration.class)).web(WebApplicationType.NONE).run( + "--spring.cloud.stream.source=outputA;outputB", + "--spring.cloud.stream.bindings.outputA-out-0.producer.partition-count=2", + "--spring.cloud.stream.bindings.outputA-out-0.content-type=application/json", + + "--spring.cloud.stream.bindings.outputB-out-0.producer.partition-count=8", + "--spring.cloud.stream.bindings.outputB-out-0.content-type=application/json", + "--spring.cloud.stream.bindings.outputB-out-0.producer.partition-key-expression=headers['partitionKey']", + + "--spring.jmx.enabled=false")) { + StreamBridge streamBridge = context.getBean(StreamBridge.class); + Field field = ReflectionUtils.findField(StreamBridge.class, "streamBridgeFunctionCache"); + Objects.requireNonNull(field).setAccessible(true); + + + streamBridge.send("outputB-out-0", MessageBuilder.withPayload("outputB").setHeader("partitionKey", "oleg").build()); + streamBridge.send("outputA-out-0", MessageBuilder.withPayload("outputA").build()); + /* + * Nothing to assert other than this test should not fail due to the cache collision described in GH-3105 + */ + } + } + /* * This test verifies that when a partition key expression is set, then scst_partition is always set, even in * concurrent scenarios. diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java index 688fe4901..907b51728 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java @@ -248,6 +248,10 @@ public final class StreamBridge implements StreamOperations, SmartInitializingSi + Boolean.hashCode(producerProperties.isPartitioned()) + producerProperties.getPartitionCount(); + if (producerProperties.getPartitionKeyExpression() != null && producerProperties.getBindingName() != null) { + hash += producerProperties.getBindingName().hashCode(); + } + return hash; }