From 0bd71e61e0fc9c89fbe98bc23389cf1a43cecb23 Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Tue, 20 Mar 2018 17:28:29 +0000 Subject: [PATCH] Extract common logic in invokers into base class --- .../main/asciidoc/spring-cloud-function.adoc | 4 +- .../AbstractStreamListeningInvoker.java | 236 ++++++++++++++++++ .../StreamListeningConsumerInvoker.java | 157 +----------- .../StreamListeningFunctionInvoker.java | 199 +-------------- .../mixed/PojoStreamingNotSharedTests.java | 144 +++++++++++ 5 files changed, 387 insertions(+), 353 deletions(-) create mode 100644 spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/AbstractStreamListeningInvoker.java create mode 100644 spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingNotSharedTests.java diff --git a/docs/src/main/asciidoc/spring-cloud-function.adoc b/docs/src/main/asciidoc/spring-cloud-function.adoc index d20a5ea50..a43e4ebeb 100644 --- a/docs/src/main/asciidoc/spring-cloud-function.adoc +++ b/docs/src/main/asciidoc/spring-cloud-function.adoc @@ -49,11 +49,13 @@ As the table above shows the behaviour of the endpoint depends on the method and Functions and consumers that are declared with input and output in `Message` will see the request headers on the input messages, and the output message headers will be converted to HTTP headers. +When POSTing text the response format might be different with Spring Boot 2.0 and older versions, depending on the content negotiation (provide content type and accpt headers for the best results). + == Standalone Streaming Applications To send or receive messages from a broker (such as RabbitMQ or Kafka) you can use the `spring-cloud-function-stream` adapter. Add the adapter to your classpath along with the appropriate binder from Spring Cloud Stream. The adapter will bind to the message broker as a `Processor` (input and output streams) unless the user explicitly disables one or the other using `spring.cloud.function.stream.{source,sink}.enabled=false`. -An incoming message is routed to a function (or consumer). If there is only one, then the choice is obvious. If there are multiple functions that can accept an incoming message, the message is inspected to see if there is a `stream_routekey` header containing the name of a function. The header is also added to outgoing messages from a supplier. Messages with no route key can be routed exclusively to a function or consumer by specifying `spring.cloud.function.stream.{processor,sink}.name`. A single supplier can be chosen for output messages (if more than one is available) using the `spring.cloud.function.stream.supplier.name`. Routing headers or function names can be composed using a comma or pipe separated name. +An incoming message is routed to a function (or consumer). If there is only one, then the choice is obvious. If there are multiple functions that can accept an incoming message, the message is inspected to see if there is a `stream_routekey` header containing the name of a function. Routing headers or function names can be composed using a comma- or pipe-separated name. The header is also added to outgoing messages from a supplier. Messages with no route key can be routed exclusively to a function or consumer by specifying `spring.cloud.function.stream.{processor,sink}.name`. If a single function cannot be identified to process an incoming message there will be an error, unless you set `spring.cloud.function.stream.shared=true`, in which case such messages will be sent to all compatible functions. A single supplier can be chosen for output messages from a supplier (if more than one is available) using the `spring.cloud.function.stream.source.name`. NOTE: some binders will fail on startup if the message broker is not available and the function catalog contains suppliers that immediately produce messages when accessed. You can switch off the automatic publishing from suppliers on startup using the `spring.cloud.function.strean.supplier.enabled=false` flag. diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/AbstractStreamListeningInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/AbstractStreamListeningInvoker.java new file mode 100644 index 000000000..61e4c9ae7 --- /dev/null +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/AbstractStreamListeningInvoker.java @@ -0,0 +1,236 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.function.stream.config; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.springframework.beans.factory.SmartInitializingSingleton; +import org.springframework.cloud.function.context.FunctionCatalog; +import org.springframework.cloud.function.context.catalog.FunctionInspector; +import org.springframework.cloud.function.context.message.MessageUtils; +import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory; +import org.springframework.messaging.Message; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.support.MessageBuilder; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * @author Dave Syer + * + */ +public abstract class AbstractStreamListeningInvoker + implements SmartInitializingSingleton { + + private final FunctionInspector functionInspector; + + private final FunctionCatalog functionCatalog; + + private final CompositeMessageConverterFactory converterFactory; + + private MessageConverter converter; + + private static final Object UNCONVERTED = new Object(); + + private final String defaultRoute; + + private final Map processors = new HashMap<>(); + + private static final FluxMessageProcessor NOENDPOINT = flux -> Flux.empty(); + + private boolean share; + + public AbstractStreamListeningInvoker(FunctionCatalog functionCatalog, + FunctionInspector functionInspector, + CompositeMessageConverterFactory converterFactory, String defaultRoute, + boolean share) { + this.functionCatalog = functionCatalog; + this.functionInspector = functionInspector; + this.converterFactory = converterFactory; + this.defaultRoute = defaultRoute; + this.share = share; + } + + @Override + public void afterSingletonsInstantiated() { + this.converter = this.converterFactory.getMessageConverterForAllRegistered(); + } + + protected Mono consumer(String name, Flux> flux) { + Consumer consumer = functionCatalog.lookup(Consumer.class, name); + flux = flux.publish().refCount(2); + // The consumer will subscribe to the input flux, so we need to listen separately + consumer.accept(flux.map(message -> convertInput(consumer).apply(message)) + .filter(transformed -> transformed != UNCONVERTED)); + return flux.then(Mono.empty()); + } + + protected Flux> function(String name, Flux> flux) { + Function> function = functionCatalog.lookup(Function.class, name); + return flux.publish(values -> { + Flux result = function + .apply(values.map(message -> convertInput(function).apply(message))); + if (this.functionInspector.isMessage(function)) { + result = result.map(message -> MessageUtils.unpack(function, message)); + } + Flux> aggregate = headers(values); + return aggregate.withLatestFrom(result, + (map, payload) -> message(map, payload)); + }); + } + + private Flux> headers(Flux> flux) { + return flux.map(message -> message.getHeaders()); + } + + private Message message(Map headers, Object result) { + return result instanceof Message + ? MessageBuilder.fromMessage((Message) result) + .copyHeadersIfAbsent(headers).build() + : MessageBuilder.withPayload(result).copyHeadersIfAbsent(headers).build(); + } + + private Function, Object> convertInput(Object function) { + Class inputType = functionInspector.getInputType(function); + return m -> { + if (functionInspector.isMessage(function)) { + return MessageUtils.create(function, convertPayload(inputType, m), + m.getHeaders()); + } + else { + return convertPayload(inputType, m); + } + }; + } + + protected Object convertPayload(Class inputType, Message m) { + Object result; + if (inputType.isAssignableFrom(m.getPayload().getClass())) { + result = m.getPayload(); + } + else { + result = this.converter.fromMessage(m, inputType); + } + if (result == null) { + result = UNCONVERTED; + } + return result; + } + + private Flux> balance(List names, Flux> flux) { + if (names.isEmpty()) { + return Flux.empty(); + } + flux = flux.hide(); + Flux> result = Flux.empty(); + if (names.size() > 1) { + if (this.share) { + flux = flux.publish().refCount(names.size()); + } + else { + return Flux.error(new IllegalStateException( + "Multiple matches and share disabled: " + names)); + } + } + for (String name : names) { + if (functionCatalog.lookup(Consumer.class, name) != null) { + result = result.mergeWith( + consumer(name, flux).thenMany(Flux.>empty())); + } + else { + result = result.mergeWith(function(name, flux)); + } + } + return result; + } + + protected FluxMessageProcessor select(Message input) { + FluxMessageProcessor processor = null; + if (input.getHeaders().containsKey(StreamConfigurationProperties.ROUTE_KEY)) { + String key = (String) input.getHeaders() + .get(StreamConfigurationProperties.ROUTE_KEY); + processor = stash(key); + } + if (processor == null && defaultRoute != null) { + processor = stash(defaultRoute); + } + if (processor == null) { + Set names = new LinkedHashSet<>( + functionCatalog.getNames(Function.class)); + names.addAll(functionCatalog.getNames(Consumer.class)); + List matches = new ArrayList<>(); + if (names.size() == 1) { + String key = names.iterator().next(); + processor = stash(key); + } + else { + for (String candidate : names) { + Object function = functionCatalog.lookup(Function.class, candidate); + if (function == null) { + function = functionCatalog.lookup(Consumer.class, candidate); + } + if (function == null) { + continue; + } + Class inputType = functionInspector.getInputType(function); + Object value = convertPayload(inputType, input); + if (value != null && inputType.isInstance(value)) { + matches.add(candidate); + } + } + if (matches.size() == 1) { + processor = stash(matches.iterator().next()); + } + else { + return flux -> balance(matches, flux); + } + } + } + if (processor == null) { + return NOENDPOINT; + } + return processor; + } + + private FluxMessageProcessor stash(String key) { + if (functionCatalog.lookup(Function.class, key) != null) { + if (!processors.containsKey(key)) { + processors.put(key, flux -> function(key, flux)); + } + return processors.get(key); + } + else if (functionCatalog.lookup(Consumer.class, key) != null) { + if (!processors.containsKey(key)) { + processors.put(key, + flux -> consumer(key, flux).thenMany(Flux.>empty())); + } + return processors.get(key); + } + return null; + } + + interface FluxMessageProcessor { + Flux> process(Flux> flux); + } +} \ No newline at end of file diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningConsumerInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningConsumerInvoker.java index d62d92e8a..a68402fef 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningConsumerInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningConsumerInvoker.java @@ -16,66 +16,26 @@ package org.springframework.cloud.function.stream.config; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.Consumer; -import java.util.function.Function; - -import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.catalog.FunctionInspector; -import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.messaging.Message; -import org.springframework.messaging.converter.MessageConverter; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; /** * @author Dave Syer */ -public class StreamListeningConsumerInvoker implements SmartInitializingSingleton { - - private final FunctionInspector functionInspector; - - private final FunctionCatalog functionCatalog; - - private final CompositeMessageConverterFactory converterFactory; - - private MessageConverter converter; - - private final String defaultRoute; - - private final Map processors = new HashMap<>(); - - private static final FluxMessageProcessor NOENDPOINT = flux -> Mono.empty(); - - private static final Object UNCONVERTED = new Object(); - - private boolean share; +public class StreamListeningConsumerInvoker extends AbstractStreamListeningInvoker { public StreamListeningConsumerInvoker(FunctionCatalog functionCatalog, FunctionInspector functionInspector, CompositeMessageConverterFactory converterFactory, String defaultRoute, boolean share) { - this.functionCatalog = functionCatalog; - this.functionInspector = functionInspector; - this.converterFactory = converterFactory; - this.defaultRoute = defaultRoute; - this.share = share; - } - - @Override - public void afterSingletonsInstantiated() { - this.converter = this.converterFactory.getMessageConverterForAllRegistered(); + super(functionCatalog, functionInspector, converterFactory, defaultRoute, share); } @StreamListener @@ -84,117 +44,4 @@ public class StreamListeningConsumerInvoker implements SmartInitializingSingleto .subscribe(); } - private Mono consumer(String name, Flux> flux) { - Consumer consumer = functionCatalog.lookup(Consumer.class, name); - consumer.accept(flux.map(message -> convertInput(consumer).apply(message)) - .filter(transformed -> transformed != UNCONVERTED)); - return Mono.empty(); - } - - private Mono balance(List names, Flux> flux) { - if (names.isEmpty()) { - return Mono.empty(); - } - Flux result = Flux.empty(); - if (names.size() > 1) { - if (this.share) { - flux = flux.share(); - } - else { - return Mono.error(new IllegalStateException( - "Multiple matches and share disabled: " + names)); - } - } - for (String name : names) { - result = result.zipWith(consumer(name, flux)); - } - return result.then(); - } - - private FluxMessageProcessor select(Message input) { - String name = null; - if (input.getHeaders().containsKey(StreamConfigurationProperties.ROUTE_KEY)) { - String key = (String) input.getHeaders() - .get(StreamConfigurationProperties.ROUTE_KEY); - name = stash(key); - } - if (name == null && defaultRoute != null) { - name = stash(defaultRoute); - } - if (name == null) { - Set names = new LinkedHashSet<>( - functionCatalog.getNames(Consumer.class)); - List matches = new ArrayList<>(); - if (names.size() == 1) { - String key = names.iterator().next(); - name = stash(key); - } - else { - for (String candidate : names) { - Object function = functionCatalog.lookup(Consumer.class, candidate); - if (function == null) { - continue; - } - Class inputType = functionInspector.getInputType(function); - Object value = this.converter.fromMessage(input, inputType); - if (value != null && inputType.isInstance(value)) { - matches.add(candidate); - } - } - if (matches.size() == 1) { - name = stash(matches.iterator().next()); - } - else { - // TODO: do we really want this? Or maybe warn that it is happening? - return flux -> balance(matches, flux); - } - } - } - if (name == null) { - return NOENDPOINT; - } - return processors.get(name); - } - - private String stash(String key) { - if (functionCatalog.lookup(Consumer.class, key) != null) { - if (!processors.containsKey(key)) { - processors.put(key, flux -> consumer(key, flux)); - } - return key; - } - return null; - } - - private Function, Object> convertInput(Object function) { - Class inputType = functionInspector.getInputType(function); - return m -> { - if (functionInspector.isMessage(function)) { - return MessageUtils.create(function, convertPayload(inputType, m), - m.getHeaders()); - } - else { - return convertPayload(inputType, m); - } - }; - } - - private Object convertPayload(Class inputType, Message m) { - Object result; - if (inputType.isAssignableFrom(m.getPayload().getClass())) { - result = m.getPayload(); - } - else { - result = this.converter.fromMessage(m, inputType); - } - if (result == null) { - result = UNCONVERTED; - } - return result; - } - - interface FluxMessageProcessor { - Mono process(Flux> flux); - } - } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java index 2b31d0ca5..514bb7296 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java @@ -16,19 +16,8 @@ package org.springframework.cloud.function.stream.config; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.Consumer; -import java.util.function.Function; - -import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.catalog.FunctionInspector; -import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.StreamListener; @@ -36,8 +25,6 @@ import org.springframework.cloud.stream.converter.CompositeMessageConverterFacto import org.springframework.cloud.stream.messaging.Processor; import org.springframework.cloud.stream.reactive.FluxSender; import org.springframework.messaging.Message; -import org.springframework.messaging.converter.MessageConverter; -import org.springframework.messaging.support.MessageBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -46,40 +33,13 @@ import reactor.core.publisher.Mono; * @author Mark Fisher * @author Marius Bogoevici */ -public class StreamListeningFunctionInvoker implements SmartInitializingSingleton { - - private final FunctionInspector functionInspector; - - private final FunctionCatalog functionCatalog; - - private final CompositeMessageConverterFactory converterFactory; - - private MessageConverter converter; - - private final String defaultRoute; - - private final Map processors = new HashMap<>(); - - private static final FluxMessageProcessor NOENDPOINT = flux -> Flux.empty(); - - private static final Object UNCONVERTED = new Object(); - - private boolean share; +public class StreamListeningFunctionInvoker extends AbstractStreamListeningInvoker { public StreamListeningFunctionInvoker(FunctionCatalog functionCatalog, FunctionInspector functionInspector, CompositeMessageConverterFactory converterFactory, String defaultRoute, boolean share) { - this.functionCatalog = functionCatalog; - this.functionInspector = functionInspector; - this.converterFactory = converterFactory; - this.defaultRoute = defaultRoute; - this.share = share; - } - - @Override - public void afterSingletonsInstantiated() { - this.converter = this.converterFactory.getMessageConverterForAllRegistered(); + super(functionCatalog, functionInspector, converterFactory, defaultRoute, share); } @StreamListener @@ -89,159 +49,4 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto input.groupBy(this::select).flatMap(group -> group.key().process(group))); } - private Flux> function(String name, Flux> flux) { - Function> function = functionCatalog.lookup(Function.class, name); - return flux.publish(values -> { - Flux result = function - .apply(values.map(message -> convertInput(function).apply(message))); - if (this.functionInspector.isMessage(function)) { - result = result.map(message -> MessageUtils.unpack(function, message)); - } - Flux> aggregate = headers(values); - return aggregate.withLatestFrom(result, - (map, payload) -> message(map, payload)); - }); - } - - private Flux> headers(Flux> flux) { - return flux.map(message -> message.getHeaders()); - } - - private Message message(Map headers, Object result) { - return result instanceof Message - ? MessageBuilder.fromMessage((Message) result) - .copyHeadersIfAbsent(headers).build() - : MessageBuilder.withPayload(result).copyHeadersIfAbsent(headers).build(); - } - - private Flux> consumer(String name, Flux> flux) { - Consumer consumer = functionCatalog.lookup(Consumer.class, name); - flux = flux.publish().refCount(2); - // The consumer will subscribe to the input flux, so we need to listen separately - consumer.accept(flux.map(message -> convertInput(consumer).apply(message)) - .filter(transformed -> transformed != UNCONVERTED)); - return flux.ignoreElements().flux(); - } - - private Flux> balance(List names, Flux> flux) { - if (names.isEmpty()) { - return Flux.empty(); - } - flux = flux.hide(); - Flux> result = Flux.empty(); - if (names.size() > 1) { - if (this.share) { - flux = flux.publish().refCount(names.size()); - } - else { - return Flux.error(new IllegalStateException( - "Multiple matches and share disabled: " + names)); - } - } - for (String name : names) { - if (functionCatalog.lookup(Consumer.class, name) != null) { - result = result.mergeWith(consumer(name, flux)); - } - else { - result = result.mergeWith(function(name, flux)); - } - } - return result; - } - - private FluxMessageProcessor select(Message input) { - String name = null; - if (input.getHeaders().containsKey(StreamConfigurationProperties.ROUTE_KEY)) { - String key = (String) input.getHeaders() - .get(StreamConfigurationProperties.ROUTE_KEY); - name = stash(key); - } - if (name == null && defaultRoute != null) { - name = stash(defaultRoute); - } - if (name == null) { - Set names = new LinkedHashSet<>( - functionCatalog.getNames(Function.class)); - names.addAll(functionCatalog.getNames(Consumer.class)); - List matches = new ArrayList<>(); - if (names.size() == 1) { - String key = names.iterator().next(); - name = stash(key); - } - else { - for (String candidate : names) { - Object function = functionCatalog.lookup(Function.class, candidate); - if (function == null) { - function = functionCatalog.lookup(Consumer.class, candidate); - } - if (function == null) { - continue; - } - Class inputType = functionInspector.getInputType(function); - Object value = this.converter.fromMessage(input, inputType); - if (value != null && inputType.isInstance(value)) { - matches.add(candidate); - } - } - if (matches.size() == 1) { - name = stash(matches.iterator().next()); - } - else { - return flux -> balance(matches, flux); - } - } - } - if (name == null) { - return NOENDPOINT; - } - return processors.get(name); - } - - private String stash(String key) { - if (functionCatalog.lookup(Function.class, key) != null) { - if (!processors.containsKey(key)) { - processors.put(key, flux -> function(key, flux)); - } - return key; - } - else if (functionCatalog.lookup(Consumer.class, key) != null) { - if (!processors.containsKey(key)) { - processors.put(key, flux -> consumer(key, flux)); - } - return key; - } - return null; - } - - private Function, Object> convertInput(Object function) { - Class inputType = functionInspector.getInputType(function); - return m -> { - if (functionInspector.isMessage(function)) { - return MessageUtils.create(function, convertPayload(inputType, m), - m.getHeaders()); - } - else { - return convertPayload(inputType, m); - } - }; - } - - private Object convertPayload(Class inputType, Message m) { - Object result; - if (inputType.isAssignableFrom(m.getPayload().getClass())) { - result = m.getPayload(); - } - else { - result = this.converter.fromMessage(m, inputType); - } - if (result == null) { - result = UNCONVERTED; - } - return result; - } - - interface FluxMessageProcessor { - Flux> process(Flux> flux); - } - } diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingNotSharedTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingNotSharedTests.java new file mode 100644 index 000000000..cc27a12c2 --- /dev/null +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingNotSharedTests.java @@ -0,0 +1,144 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream.mixed; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.function.stream.config.StreamConfigurationProperties; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Marius Bogoevici + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = PojoStreamingNotSharedTests.StreamingFunctionApplication.class) +public class PojoStreamingNotSharedTests { + + @Autowired + Processor processor; + + @Autowired + MessageCollector messageCollector; + + @Autowired + List collector; + + @Before + public void init() { + collector.clear(); + } + + @Test + public void balance() throws Exception { + processor.input() + .send(MessageBuilder.withPayload("{\"name\":\"hello\"}").build()); + processor.input() + .send(MessageBuilder.withPayload("{\"name\":\"world\"}").build()); + assertThat(messageCollector.forChannel(processor.output())).isEmpty(); + assertThat(collector).hasSize(0); + // There should be an error in the logs (sharing disabled by default) + } + + @Test + public void routing() throws Exception { + processor.input().send(MessageBuilder.withPayload("{\"name\":\"hello\"}") + .setHeader(StreamConfigurationProperties.ROUTE_KEY, "uppercase").build()); + processor.input().send(MessageBuilder.withPayload("{\"name\":\"world\"}") + .setHeader(StreamConfigurationProperties.ROUTE_KEY, "uppercase").build()); + Message result = messageCollector.forChannel(processor.output()).poll(1000, + TimeUnit.MILLISECONDS); + assertThat(result.getPayload()).isInstanceOf(Foo.class); + // routing key sends messages to the function, not the consumer + assertThat(collector).hasSize(0); + } + + @SpringBootApplication + public static class StreamingFunctionApplication { + + @Bean + public Function uppercase() { + return f -> new Foo(f.getName().toUpperCase()); + } + + @Bean + public List collector() { + return new ArrayList<>(); + } + + @Bean + public Consumer sink(final List list) { + return s -> list.add(s); + } + + } + + protected static class Foo { + private String name; + + Foo() { + } + + public Foo(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } + + protected static class Bar { + private String name; + + Bar() { + } + + public Bar(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +}