diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamAutoConfiguration.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamAutoConfiguration.java index 044c89835..808f761bd 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamAutoConfiguration.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamAutoConfiguration.java @@ -62,7 +62,8 @@ public class StreamAutoConfiguration { @Bean public SupplierInvokingMessageProducer supplierInvoker( FunctionCatalog registry) { - return new SupplierInvokingMessageProducer(registry, properties.getSource().getName()); + return new SupplierInvokingMessageProducer(registry, + properties.getSource().getName()); } } @@ -80,7 +81,8 @@ public class StreamAutoConfiguration { FunctionInspector functionInspector, @Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) { return new StreamListeningFunctionInvoker(registry, functionInspector, - compositeMessageConverterFactory, properties.getDefaultRoute()); + compositeMessageConverterFactory, properties.getDefaultRoute(), + properties.isShared()); } } @@ -91,7 +93,7 @@ public class StreamAutoConfiguration { @Autowired private StreamConfigurationProperties properties; - + public SinkConfiguration() { } @@ -100,7 +102,8 @@ public class StreamAutoConfiguration { FunctionInspector functionInspector, @Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) { return new StreamListeningConsumerInvoker(registry, functionInspector, - compositeMessageConverterFactory, properties.getSink().getName()); + compositeMessageConverterFactory, properties.getSink().getName(), + properties.isShared()); } } @@ -125,7 +128,7 @@ public class StreamAutoConfiguration { private static class SinkOnlyCondition extends SpringBootCondition { private SourceAndSinkCondition processor = new SourceAndSinkCondition(); - + private SinkCondition sink = new SinkCondition(); @Override @@ -137,7 +140,8 @@ public class StreamAutoConfiguration { if (sink.matches(context, metadata)) { return ConditionOutcome.match("Sink is explicitly enabled"); } - return ConditionOutcome.noMatch("Sink is not enabled and not available through Processor"); + return ConditionOutcome + .noMatch("Sink is not enabled and not available through Processor"); } } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamConfigurationProperties.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamConfigurationProperties.java index 7a92c4536..9c6df8f7a 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamConfigurationProperties.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamConfigurationProperties.java @@ -30,6 +30,8 @@ public class StreamConfigurationProperties { private Processor processor = new Processor(); + private boolean shared; + public static final String ROUTE_KEY = "stream_routekey"; public Sink getSink() { @@ -44,6 +46,14 @@ public class StreamConfigurationProperties { return this.processor; } + public boolean isShared() { + return this.shared; + } + + public void setShared(boolean shared) { + this.shared = shared; + } + public static class Sink { /** @@ -72,6 +82,7 @@ public class StreamConfigurationProperties { public void setEnabled(boolean enabled) { this.enabled = enabled; } + } public static class Source { @@ -149,10 +160,11 @@ public class StreamConfigurationProperties { public void setEnabled(boolean enabled) { this.enabled = enabled; } + } public String getDefaultRoute() { - return processor.getName()!=null ? processor.getName() : sink.getName(); + return processor.getName() != null ? processor.getName() : sink.getName(); } } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningConsumerInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningConsumerInvoker.java index 71267b9b5..d62d92e8a 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningConsumerInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningConsumerInvoker.java @@ -37,6 +37,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.converter.MessageConverter; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * @author Dave Syer @@ -55,19 +56,21 @@ public class StreamListeningConsumerInvoker implements SmartInitializingSingleto private final Map processors = new HashMap<>(); - private int count = -1; - - private static final FluxMessageProcessor NOENDPOINT = flux -> Flux.empty(); + private static final FluxMessageProcessor NOENDPOINT = flux -> Mono.empty(); private static final Object UNCONVERTED = new Object(); + private boolean share; + public StreamListeningConsumerInvoker(FunctionCatalog functionCatalog, FunctionInspector functionInspector, - CompositeMessageConverterFactory converterFactory, String defaultRoute) { + CompositeMessageConverterFactory converterFactory, String defaultRoute, + boolean share) { this.functionCatalog = functionCatalog; this.functionInspector = functionInspector; this.converterFactory = converterFactory; this.defaultRoute = defaultRoute; + this.share = share; } @Override @@ -77,32 +80,35 @@ public class StreamListeningConsumerInvoker implements SmartInitializingSingleto @StreamListener public void handle(@Input(Processor.INPUT) Flux> input) { - input.groupBy(this::select).flatMap(group -> group.key().process(group)).subscribe(); + input.groupBy(this::select).flatMap(group -> group.key().process(group)) + .subscribe(); } - private Flux> consumer(String name, Flux> flux) { + private Mono consumer(String name, Flux> flux) { Consumer consumer = functionCatalog.lookup(Consumer.class, name); consumer.accept(flux.map(message -> convertInput(consumer).apply(message)) .filter(transformed -> transformed != UNCONVERTED)); - return Flux.empty(); + return Mono.empty(); } - private Flux> balance(List names, Flux> flux) { + private Mono balance(List names, Flux> flux) { if (names.isEmpty()) { - return Flux.empty(); + return Mono.empty(); } - String name = choose(names); - if (functionCatalog.lookup(Consumer.class, name) != null) { - return consumer(name, flux); + Flux result = Flux.empty(); + if (names.size() > 1) { + if (this.share) { + flux = flux.share(); + } + else { + return Mono.error(new IllegalStateException( + "Multiple matches and share disabled: " + names)); + } } - return Flux.empty(); - } - - private synchronized String choose(List names) { - if (++count >= names.size() || count < 0) { - count = 0; + for (String name : names) { + result = result.zipWith(consumer(name, flux)); } - return names.get(count); + return result.then(); } private FluxMessageProcessor select(Message input) { @@ -188,7 +194,7 @@ public class StreamListeningConsumerInvoker implements SmartInitializingSingleto } interface FluxMessageProcessor { - Flux> process(Flux> flux); + Mono process(Flux> flux); } } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java index 13d967dbb..2b31d0ca5 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java @@ -60,19 +60,21 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto private final Map processors = new HashMap<>(); - private int count = -1; - private static final FluxMessageProcessor NOENDPOINT = flux -> Flux.empty(); private static final Object UNCONVERTED = new Object(); + private boolean share; + public StreamListeningFunctionInvoker(FunctionCatalog functionCatalog, FunctionInspector functionInspector, - CompositeMessageConverterFactory converterFactory, String defaultRoute) { + CompositeMessageConverterFactory converterFactory, String defaultRoute, + boolean share) { this.functionCatalog = functionCatalog; this.functionInspector = functionInspector; this.converterFactory = converterFactory; this.defaultRoute = defaultRoute; + this.share = share; } @Override @@ -96,7 +98,8 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto result = result.map(message -> MessageUtils.unpack(function, message)); } Flux> aggregate = headers(values); - return result.withLatestFrom(aggregate, (p, m) -> message(p, m)); + return aggregate.withLatestFrom(result, + (map, payload) -> message(map, payload)); }); } @@ -104,10 +107,8 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto return flux.map(message -> message.getHeaders()); } - private Message message(Object result, Map headers) { + private Message message(Map headers, Object result) { return result instanceof Message - // TODO: why do we have to do this? The headers should have come with the - // result. ? MessageBuilder.fromMessage((Message) result) .copyHeadersIfAbsent(headers).build() : MessageBuilder.withPayload(result).copyHeadersIfAbsent(headers).build(); @@ -115,27 +116,37 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto private Flux> consumer(String name, Flux> flux) { Consumer consumer = functionCatalog.lookup(Consumer.class, name); + flux = flux.publish().refCount(2); + // The consumer will subscribe to the input flux, so we need to listen separately consumer.accept(flux.map(message -> convertInput(consumer).apply(message)) .filter(transformed -> transformed != UNCONVERTED)); - return Flux.empty(); + return flux.ignoreElements().flux(); } private Flux> balance(List names, Flux> flux) { if (names.isEmpty()) { return Flux.empty(); } - String name = choose(names); - if (functionCatalog.lookup(Consumer.class, name) != null) { - return consumer(name, flux); + flux = flux.hide(); + Flux> result = Flux.empty(); + if (names.size() > 1) { + if (this.share) { + flux = flux.publish().refCount(names.size()); + } + else { + return Flux.error(new IllegalStateException( + "Multiple matches and share disabled: " + names)); + } } - return function(name, flux); - } - - private synchronized String choose(List names) { - if (++count >= names.size() || count < 0) { - count = 0; + for (String name : names) { + if (functionCatalog.lookup(Consumer.class, name) != null) { + result = result.mergeWith(consumer(name, flux)); + } + else { + result = result.mergeWith(function(name, flux)); + } } - return names.get(count); + return result; } private FluxMessageProcessor select(Message input) { @@ -149,7 +160,8 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto name = stash(defaultRoute); } if (name == null) { - Set names = new LinkedHashSet<>(functionCatalog.getNames(Function.class)); + Set names = new LinkedHashSet<>( + functionCatalog.getNames(Function.class)); names.addAll(functionCatalog.getNames(Consumer.class)); List matches = new ArrayList<>(); if (names.size() == 1) { @@ -175,7 +187,6 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto name = stash(matches.iterator().next()); } else { - // TODO: do we really want this? Or maybe warn that it is happening? return flux -> balance(matches, flux); } } diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingMixedTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingMixedTests.java index be7f083f6..3a6607f31 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingMixedTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingMixedTests.java @@ -43,7 +43,7 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = PojoStreamingMixedTests.StreamingFunctionApplication.class) +@SpringBootTest(classes = PojoStreamingMixedTests.StreamingFunctionApplication.class, properties = "spring.cloud.function.stream.shared=true") public class PojoStreamingMixedTests { @Autowired @@ -69,8 +69,8 @@ public class PojoStreamingMixedTests { Message result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS); assertThat(result.getPayload()).isInstanceOf(Foo.class); - // 2 subscribers to the same channel so input messages are applied as round robin - assertThat(collector).hasSize(1); + // 2 subscribers to the same channel but input messages are sent to all + assertThat(collector).hasSize(2); } @Test