From 6a380c4d1938511a8f6a8023a3cb94ebdc8aca7b Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 19 Jan 2022 16:24:32 -0500 Subject: [PATCH] Change `doOnSubscribe` to `doOnRequest` The `aggregatorFunction` has a flaw subscribing and emitting data from a source `Flux` too early: from a `doOnSubscribe()` callback which is really called *before* the real subscription is registered in the target `Publisher` Change the logic to the `doOnRequest()` which happens already after subscription is registered in the `Publisher` **Cherry-pick to `2020.0.x`** --- .../cloud/fn/aggregator/AggregatorFunctionConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/functions/function/aggregator-function/src/main/java/org/springframework/cloud/fn/aggregator/AggregatorFunctionConfiguration.java b/functions/function/aggregator-function/src/main/java/org/springframework/cloud/fn/aggregator/AggregatorFunctionConfiguration.java index 166693de..5711987a 100644 --- a/functions/function/aggregator-function/src/main/java/org/springframework/cloud/fn/aggregator/AggregatorFunctionConfiguration.java +++ b/functions/function/aggregator-function/src/main/java/org/springframework/cloud/fn/aggregator/AggregatorFunctionConfiguration.java @@ -63,7 +63,7 @@ public class AggregatorFunctionConfiguration { FluxMessageChannel outputChannel) { return input -> Flux.from(outputChannel) - .doOnSubscribe((sub) -> inputChannel.subscribeTo(input)); + .doOnRequest((request) -> inputChannel.subscribeTo(input)); } @Bean