Change doOnSubscribe to doOnRequest

The `aggregatorFunction` has a flaw subscribing and emitting
data from a source `Flux` too early: from a `doOnSubscribe()`
callback which is really called *before* the real subscription is
registered in the target `Publisher`

Change the logic to the `doOnRequest()` which happens already
after subscription is registered in the `Publisher`

**Cherry-pick to `2020.0.x`**
This commit is contained in:
Artem Bilan
2022-01-19 16:24:32 -05:00
committed by Soby Chacko
parent 993bc7f42b
commit 6a380c4d19

View File

@@ -63,7 +63,7 @@ public class AggregatorFunctionConfiguration {
FluxMessageChannel outputChannel) { FluxMessageChannel outputChannel) {
return input -> Flux.from(outputChannel) return input -> Flux.from(outputChannel)
.doOnSubscribe((sub) -> inputChannel.subscribeTo(input)); .doOnRequest((request) -> inputChannel.subscribeTo(input));
} }
@Bean @Bean