From d641aae4947d6b65572252a85150fea9503753bb Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Wed, 31 May 2017 08:52:16 +0100 Subject: [PATCH] Make spring.cloud.function.stream.endpoint optional for stream apps If there is only one function you shouldn't have to set any configuration to get a stream app to run. This also implementation supports multiple functions, trying to guess which one to use based on the type of the incoming message payload. In principle that could be strategized as a simple router function (e.g. to look for a header with a function name). If there are functions and consumers in the same app, they will subscribe to the same input channel (and hence by default Spring Integration will load balance between them). This could also probably use some more features, to specify the desired behaviour. If user *does* supply spring.cloud.function.stream.endpoint then it is used and overrides all other possible routes. --- .../function/stream/StreamConfiguration.java | 81 ++++++----- .../StreamListeningConsumerInvoker.java | 62 ++++++--- .../StreamListeningFunctionInvoker.java | 60 ++++++--- .../SupplierInvokingMessageProducer.java | 34 ++--- .../FluxPojoStreamingConsumerTests.java | 4 +- .../consumer/FluxStreamingConsumerTests.java | 4 +- .../consumer/PojoStreamingConsumerTests.java | 4 +- .../consumer/StreamingConsumerTests.java | 4 +- .../FluxPojoStreamingFunctionTests.java | 4 +- .../function/FluxStreamingFunctionTests.java | 5 +- .../function/PojoStreamingFunctionTests.java | 4 +- .../function/StreamingFunctionTests.java | 5 +- .../PojoStreamingExplicitEndpointTests.java | 95 +++++++++++++ .../stream/mixed/PojoStreamingMixedTests.java | 126 ++++++++++++++++++ .../stream/supplier/StreamSupplierTests.java | 4 +- 15 files changed, 374 insertions(+), 122 deletions(-) create mode 100644 spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingExplicitEndpointTests.java create mode 100644 spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingMixedTests.java diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java index 59f535177..3dabb9732 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java @@ -25,14 +25,15 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import org.springframework.beans.factory.ListableBeanFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionOutcome; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.SpringBootCondition; +import org.springframework.boot.bind.RelaxedPropertyResolver; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.function.context.FunctionInspector; -import org.springframework.cloud.function.invoker.AbstractFunctionInvoker; import org.springframework.cloud.function.registry.FunctionCatalog; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.binder.Binder; @@ -46,17 +47,13 @@ import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.ConfigurationCondition; import org.springframework.context.annotation.Lazy; import org.springframework.core.type.AnnotatedTypeMetadata; -import org.springframework.util.Assert; -import org.springframework.util.StringUtils; - -import reactor.core.publisher.Flux; /** * @author Mark Fisher * @author Marius Bogoevici */ @EnableConfigurationProperties(StreamConfigurationProperties.class) -@ConditionalOnClass({ Binder.class, AbstractFunctionInvoker.class }) +@ConditionalOnClass({ Binder.class }) @ConditionalOnProperty(name = "spring.cloud.stream.enabled", havingValue = "true", matchIfMissing = true) public class StreamConfiguration { @@ -64,16 +61,12 @@ public class StreamConfiguration { @EnableBinding(Source.class) protected static class SupplierConfiguration { - @Autowired - private StreamConfigurationProperties properties; - @Bean - @ConditionalOnProperty("spring.cloud.stream.bindings.output.destination") - public SupplierInvokingMessageProducer invoker(FunctionCatalog registry) { - String name = properties.getEndpoint(); - long interval = properties.getInterval(); - Supplier> supplier = registry.lookupSupplier(name); - return new SupplierInvokingMessageProducer(supplier, interval); + public SupplierInvokingMessageProducer supplierInvoker( + ListableBeanFactory beanFactory, FunctionCatalog registry) { + String[] names = beanFactory.getBeanNamesForType(Supplier.class, false, + false); + return new SupplierInvokingMessageProducer(registry, names); } } @@ -85,14 +78,14 @@ public class StreamConfiguration { private StreamConfigurationProperties properties; @Bean - @ConditionalOnProperty("spring.cloud.stream.bindings.input.destination") - public AbstractFunctionInvoker invoker(FunctionCatalog registry, FunctionInspector functionInspector, + public StreamListeningFunctionInvoker functionInvoker( + ListableBeanFactory beanFactory, FunctionCatalog registry, + FunctionInspector functionInspector, @Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) { - String name = properties.getEndpoint(); - Function, Flux> function = registry.lookupFunction(name); - Assert.notNull(function, "no such function: " + name); - return new StreamListeningFunctionInvoker(name, function, functionInspector, - compositeMessageConverterFactory); + String[] names = beanFactory.getBeanNamesForType(Function.class, false, + false); + return new StreamListeningFunctionInvoker(registry, functionInspector, + compositeMessageConverterFactory, properties.getEndpoint(), names); } } @@ -104,13 +97,14 @@ public class StreamConfiguration { private StreamConfigurationProperties properties; @Bean - @ConditionalOnProperty("spring.cloud.stream.bindings.input.destination") - public StreamListeningConsumerInvoker invoker(FunctionCatalog registry, FunctionInspector functionInspector, + public StreamListeningConsumerInvoker consumerInvoker( + ListableBeanFactory beanFactory, FunctionCatalog registry, + FunctionInspector functionInspector, @Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) { - String name = properties.getEndpoint(); - Consumer> consumer = registry.lookupConsumer(name); - return new StreamListeningConsumerInvoker(name, consumer, functionInspector, - compositeMessageConverterFactory); + String[] names = beanFactory.getBeanNamesForType(Consumer.class, false, + false); + return new StreamListeningConsumerInvoker(registry, functionInspector, + compositeMessageConverterFactory, properties.getEndpoint(), names); } } @@ -145,21 +139,24 @@ public class StreamConfiguration { } @Override - public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) { - String functionName = context.getEnvironment().getProperty("spring.cloud.function.stream.endpoint"); - if (!StringUtils.hasText(functionName)) { - return ConditionOutcome.noMatch("no endpoint function name available"); + public ConditionOutcome getMatchOutcome(ConditionContext context, + AnnotatedTypeMetadata metadata) { + if (context.getBeanFactory().getBeanNamesForType(type, false, + false).length > 0) { + String endpoint = new RelaxedPropertyResolver(context.getEnvironment(), + "spring.cloud.function.stream.").getProperty("endpoint"); + if (endpoint != null && !type + .isAssignableFrom(context.getBeanFactory().getType(endpoint))) { + return ConditionOutcome.noMatch(String.format( + "explicit endpoint of type other than %s detected", type)); + } + return ConditionOutcome + .match(String.format("bean of type %s detected", type)); + } - if (functionName.indexOf(',') != -1) { - // for now we will just check the first, but later may support: - // supplier[,function]+ or [function,]+consumer - functionName = functionName.substring(0, functionName.indexOf(',')); - } - Class beanType = context.getBeanFactory().getType(functionName); - if (type.isAssignableFrom(beanType)) { - return ConditionOutcome.match(String.format("bean '%s' is a %s", functionName, type)); - } - return ConditionOutcome.noMatch(String.format("bean '%s' is not a %s", functionName, type)); + return ConditionOutcome + .noMatch(String.format("no bean of type %s detected", type)); + } @Override diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java index 99fda44d6..291f25125 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java @@ -16,64 +16,88 @@ package org.springframework.cloud.function.stream; -import java.util.function.Consumer; import java.util.function.Function; -import reactor.core.publisher.Flux; - import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.cloud.function.context.FunctionInspector; +import org.springframework.cloud.function.registry.FunctionCatalog; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory; -import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.messaging.converter.MessageConverter; +import reactor.core.publisher.Flux; + /** * @author Mark Fisher * @author Marius Bogoevici */ public class StreamListeningConsumerInvoker implements SmartInitializingSingleton { - private final Consumer> consumer; - - private final String name; - private final FunctionInspector functionInspector; private final CompositeMessageConverterFactory converterFactory; private MessageConverter converter; - private Class inputType; + private final FunctionCatalog functionCatalog; - public StreamListeningConsumerInvoker(String name, Consumer> consumer, FunctionInspector functionInspector, - CompositeMessageConverterFactory converterFactory) { - this.consumer = consumer; - this.name = name; + private final String defaultEndpoint; + + private final String[] names; + + public StreamListeningConsumerInvoker(FunctionCatalog functionCatalog, + FunctionInspector functionInspector, + CompositeMessageConverterFactory converterFactory, String defaultEndpoint, + String... names) { + this.functionCatalog = functionCatalog; this.functionInspector = functionInspector; this.converterFactory = converterFactory; + this.defaultEndpoint = defaultEndpoint; + this.names = names; } @Override public void afterSingletonsInstantiated() { this.converter = this.converterFactory.getMessageConverterForAllRegistered(); - this.inputType = this.functionInspector.getInputType(this.name); } @StreamListener - public void handle(@Input(Processor.INPUT) Flux> input) { - this.consumer.accept(input.map(convertInput())); + public void handle(@Input(Sink.INPUT) Flux> input) { + input.groupBy(this::select) + .filter(group -> functionCatalog.lookupConsumer(group.key()) != null) + .subscribe(group -> process(group.key(), group)); } - private Function, Object> convertInput() { + private void process(String name, Flux> flux) { + functionCatalog.lookupConsumer(name) + .accept(flux.map(message -> convertInput(name).apply(message))); + } + + private String select(Message input) { + String name = defaultEndpoint; + if (name == null) { + for (String candidate : names) { + Class inputType = functionInspector.getInputType(candidate); + if (this.converter.fromMessage(input, inputType) != null) { + name = candidate; + break; + } + } + } + return name; + } + + private Function, Object> convertInput(String name) { + Class inputType = functionInspector.getInputType(name); return m -> { - if (this.inputType.isAssignableFrom(m.getPayload().getClass())) { + if (inputType.isAssignableFrom(m.getPayload().getClass())) { return m.getPayload(); } else { - return converter.fromMessage(m, this.inputType); + return this.converter.fromMessage(m, inputType); } }; } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java index 5b4ddf93b..417bfe175 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java @@ -18,11 +18,9 @@ package org.springframework.cloud.function.stream; import java.util.function.Function; -import reactor.core.publisher.Flux; - import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.cloud.function.context.FunctionInspector; -import org.springframework.cloud.function.invoker.AbstractFunctionInvoker; +import org.springframework.cloud.function.registry.FunctionCatalog; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.StreamListener; @@ -30,51 +28,79 @@ import org.springframework.cloud.stream.converter.CompositeMessageConverterFacto import org.springframework.cloud.stream.messaging.Processor; import org.springframework.messaging.Message; import org.springframework.messaging.converter.MessageConverter; +import org.springframework.util.Assert; + +import reactor.core.publisher.Flux; /** * @author Mark Fisher * @author Marius Bogoevici */ -public class StreamListeningFunctionInvoker extends AbstractFunctionInvoker, Flux> - implements SmartInitializingSingleton { - - private final String name; +public class StreamListeningFunctionInvoker implements SmartInitializingSingleton { private final FunctionInspector functionInspector; + private final FunctionCatalog functionCatalog; + private final CompositeMessageConverterFactory converterFactory; private MessageConverter converter; - private Class inputType; + private final String defaultEndpoint; - public StreamListeningFunctionInvoker(String name, Function, Flux> function, FunctionInspector functionInspector, - CompositeMessageConverterFactory converterFactory) { - super(function); - this.name = name; + private final String[] names; + + public StreamListeningFunctionInvoker(FunctionCatalog functionCatalog, + FunctionInspector functionInspector, + CompositeMessageConverterFactory converterFactory, String defaultEndpoint, + String... names) { + this.functionCatalog = functionCatalog; this.functionInspector = functionInspector; this.converterFactory = converterFactory; + this.defaultEndpoint = defaultEndpoint; + this.names = names; } @Override public void afterSingletonsInstantiated() { this.converter = this.converterFactory.getMessageConverterForAllRegistered(); - this.inputType = this.functionInspector.getInputType(this.name); } @StreamListener @Output(Processor.OUTPUT) public Flux handle(@Input(Processor.INPUT) Flux> input) { - return this.doInvoke(input.map(convertInput())); + return input.groupBy(this::select) + .filter(group -> functionCatalog.lookupFunction(group.key()) != null) + .flatMap(group -> process(group.key(), group)); } - private Function, Object> convertInput() { + private Flux process(String name, Flux> flux) { + return (Flux) functionCatalog.lookupFunction(name) + .apply(flux.map(message -> convertInput(name).apply(message))); + } + + private String select(Message input) { + String name = defaultEndpoint; + if (name == null) { + for (String candidate : names) { + Class inputType = functionInspector.getInputType(candidate); + if (this.converter.fromMessage(input, inputType) != null) { + name = candidate; + break; + } + } + } + return name; + } + + private Function, Object> convertInput(String name) { + Class inputType = functionInspector.getInputType(name); return m -> { - if (this.inputType.isAssignableFrom(m.getPayload().getClass())) { + if (inputType.isAssignableFrom(m.getPayload().getClass())) { return m.getPayload(); } else { - return this.converter.fromMessage(m, this.inputType); + return this.converter.fromMessage(m, inputType); } }; } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java index 7a6329143..8ff92e03e 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java @@ -16,11 +16,9 @@ package org.springframework.cloud.function.stream; -import java.time.Duration; import java.util.function.Supplier; -import org.springframework.cloud.function.support.FluxSupplier; -import org.springframework.cloud.function.support.FunctionUtils; +import org.springframework.cloud.function.registry.FunctionCatalog; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.messaging.support.MessageBuilder; @@ -33,24 +31,30 @@ import reactor.core.publisher.Flux; */ public class SupplierInvokingMessageProducer extends MessageProducerSupport { - private final Supplier> supplier; + private final FunctionCatalog functionCatalog; - public SupplierInvokingMessageProducer(Supplier supplier, long interval) { - Assert.notNull(supplier, "Supplier must not be null"); - if (!FunctionUtils.isFluxSupplier(supplier)) { - supplier = (interval > 0) - ? new FluxSupplier<>(supplier, Duration.ofMillis(interval)) - : new FluxSupplier<>(supplier); - } - @SuppressWarnings("unchecked") - Supplier> unchecked = (Supplier>) supplier; - this.supplier = unchecked; + private final String[] names; + + public SupplierInvokingMessageProducer(FunctionCatalog registry, String... names) { + this.functionCatalog = registry; + this.names = names; this.setOutputChannelName(Source.OUTPUT); } @Override protected void doStart() { - this.supplier.get() + supplier() .subscribe(m -> this.sendMessage(MessageBuilder.withPayload(m).build())); } + + private Flux supplier() { + Supplier> supplier = null; + Flux result = Flux.empty(); + for (String name : names) { + supplier = functionCatalog.lookupSupplier(name); + Assert.notNull(supplier, "Supplier must not be null"); + result = Flux.merge(result, supplier.get()); + } + return result; + } } diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxPojoStreamingConsumerTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxPojoStreamingConsumerTests.java index f4ef80798..0d8b82be5 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxPojoStreamingConsumerTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxPojoStreamingConsumerTests.java @@ -39,9 +39,7 @@ import reactor.core.publisher.Flux; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = FluxPojoStreamingConsumerTests.StreamingSinkTest.class, properties = { - "spring.cloud.stream.bindings.input.destination=data-in", - "spring.cloud.function.stream.endpoint=sinkConsumer" }) +@SpringBootTest(classes = FluxPojoStreamingConsumerTests.StreamingSinkTest.class) public class FluxPojoStreamingConsumerTests { @Autowired diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxStreamingConsumerTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxStreamingConsumerTests.java index a774f354f..86b099571 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxStreamingConsumerTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxStreamingConsumerTests.java @@ -39,9 +39,7 @@ import reactor.core.publisher.Flux; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = FluxStreamingConsumerTests.StreamingSinkTest.class, properties = { - "spring.cloud.stream.bindings.input.destination=data-in", - "spring.cloud.function.stream.endpoint=sinkConsumer" }) +@SpringBootTest(classes = FluxStreamingConsumerTests.StreamingSinkTest.class) public class FluxStreamingConsumerTests { @Autowired diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/PojoStreamingConsumerTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/PojoStreamingConsumerTests.java index 1ed498001..511a705f0 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/PojoStreamingConsumerTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/PojoStreamingConsumerTests.java @@ -37,9 +37,7 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = PojoStreamingConsumerTests.StreamingSinkTest.class, properties = { - "spring.cloud.stream.bindings.input.destination=data-in", - "spring.cloud.function.stream.endpoint=sinkConsumer" }) +@SpringBootTest(classes = PojoStreamingConsumerTests.StreamingSinkTest.class) public class PojoStreamingConsumerTests { @Autowired diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/StreamingConsumerTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/StreamingConsumerTests.java index 7564e46ac..2bd1a7abf 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/StreamingConsumerTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/StreamingConsumerTests.java @@ -37,9 +37,7 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = StreamingConsumerTests.StreamingSinkTest.class, properties = { - "spring.cloud.stream.bindings.input.destination=data-in", - "spring.cloud.function.stream.endpoint=sinkConsumer" }) +@SpringBootTest(classes = StreamingConsumerTests.StreamingSinkTest.class) public class StreamingConsumerTests { @Autowired diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionTests.java index 728a21342..b8ac7a7a7 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionTests.java @@ -39,9 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = FluxPojoStreamingFunctionTests.StreamingFunctionApplication.class, properties = { - "spring.cloud.stream.bindings.input.destination=data-in", - "spring.cloud.stream.bindings.output.destination=data-out", "spring.cloud.function.stream.endpoint=uppercase" }) +@SpringBootTest(classes = FluxPojoStreamingFunctionTests.StreamingFunctionApplication.class) public class FluxPojoStreamingFunctionTests { @Autowired diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxStreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxStreamingFunctionTests.java index 902175248..2e283287e 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxStreamingFunctionTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxStreamingFunctionTests.java @@ -39,10 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = FluxStreamingFunctionTests.StreamingFunctionApplication.class, properties = { - "spring.cloud.stream.bindings.input.destination=data-in", - "spring.cloud.stream.bindings.output.destination=data-out", - "spring.cloud.function.stream.endpoint=uppercase" }) +@SpringBootTest(classes = FluxStreamingFunctionTests.StreamingFunctionApplication.class) public class FluxStreamingFunctionTests { @Autowired diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/PojoStreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/PojoStreamingFunctionTests.java index 69d1a32b4..dd1922452 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/PojoStreamingFunctionTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/PojoStreamingFunctionTests.java @@ -38,9 +38,7 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = PojoStreamingFunctionTests.StreamingFunctionApplication.class, properties = { - "spring.cloud.stream.bindings.input.destination=data-in", - "spring.cloud.stream.bindings.output.destination=data-out", "spring.cloud.function.stream.endpoint=uppercase" }) +@SpringBootTest(classes = PojoStreamingFunctionTests.StreamingFunctionApplication.class) public class PojoStreamingFunctionTests { @Autowired diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/StreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/StreamingFunctionTests.java index 552f9e7ce..ca597ee4e 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/StreamingFunctionTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/StreamingFunctionTests.java @@ -38,10 +38,7 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = StreamingFunctionTests.StreamingFunctionApplication.class, properties = { - "spring.cloud.stream.bindings.input.destination=data-in", - "spring.cloud.stream.bindings.output.destination=data-out", - "spring.cloud.function.stream.endpoint=uppercase" }) +@SpringBootTest(classes = StreamingFunctionTests.StreamingFunctionApplication.class) public class StreamingFunctionTests { @Autowired diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingExplicitEndpointTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingExplicitEndpointTests.java new file mode 100644 index 000000000..5340cec94 --- /dev/null +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingExplicitEndpointTests.java @@ -0,0 +1,95 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream.mixed; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Marius Bogoevici + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = PojoStreamingExplicitEndpointTests.StreamingFunctionApplication.class, properties = { + "spring.cloud.function.stream.endpoint=uppercase", + "logging.level.org.springframework.integration=DEBUG", "debug=TRUE" }) +public class PojoStreamingExplicitEndpointTests { + + @Autowired + Processor processor; + + @Autowired + MessageCollector messageCollector; + + @Test + public void test() throws Exception { + processor.input() + .send(MessageBuilder.withPayload("{\"name\":\"hello\"}").build()); + Message result = messageCollector.forChannel(processor.output()).poll(1000, + TimeUnit.MILLISECONDS); + assertThat(result.getPayload()).isInstanceOf(Foo.class); + } + + @SpringBootApplication + public static class StreamingFunctionApplication { + + @Bean + public Function uppercase() { + return f -> new Foo(f.getName().toUpperCase()); + } + + @Bean + public Supplier foos() { + return () -> new Foo("world"); + } + + } + + protected static class Foo { + private String name; + + Foo() { + } + + public Foo(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +} diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingMixedTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingMixedTests.java new file mode 100644 index 000000000..37e237b3b --- /dev/null +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingMixedTests.java @@ -0,0 +1,126 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream.mixed; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Marius Bogoevici + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = PojoStreamingMixedTests.StreamingFunctionApplication.class) +public class PojoStreamingMixedTests { + + @Autowired + Processor processor; + + @Autowired + MessageCollector messageCollector; + + @Autowired + List collector; + + @Test + public void test() throws Exception { + processor.input() + .send(MessageBuilder.withPayload("{\"name\":\"hello\"}").build()); + processor.input() + .send(MessageBuilder.withPayload("{\"name\":\"world\"}").build()); + Message result = messageCollector.forChannel(processor.output()).poll(1000, + TimeUnit.MILLISECONDS); + assertThat(result.getPayload()).isInstanceOf(Foo.class); + // 2 subscribers to the same channel so input messages are applied as round robin + assertThat(collector).hasSize(1); + } + + @SpringBootApplication + public static class StreamingFunctionApplication { + + @Bean + public Function uppercase() { + return f -> new Foo(f.getName().toUpperCase()); + } + + @Bean + public List collector() { + return new ArrayList<>(); + } + + @Bean + public Consumer sink(final List list) { + return s -> list.add(s); + } + + } + + protected static class Foo { + private String name; + + Foo() { + } + + public Foo(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } + + protected static class Bar { + private String name; + + Bar() { + } + + public Bar(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +} diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/supplier/StreamSupplierTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/supplier/StreamSupplierTests.java index af4a14bb1..0ffc11985 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/supplier/StreamSupplierTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/supplier/StreamSupplierTests.java @@ -37,9 +37,7 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = StreamSupplierTests.StreamingFunctionApplication.class, properties = { - "spring.cloud.stream.bindings.output.destination=data-out", - "spring.cloud.function.stream.endpoint=simpleSupplier" }) +@SpringBootTest(classes = StreamSupplierTests.StreamingFunctionApplication.class) public class StreamSupplierTests { @Autowired