diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java index 59f535177..3dabb9732 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java @@ -25,14 +25,15 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import org.springframework.beans.factory.ListableBeanFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionOutcome; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.SpringBootCondition; +import org.springframework.boot.bind.RelaxedPropertyResolver; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.function.context.FunctionInspector; -import org.springframework.cloud.function.invoker.AbstractFunctionInvoker; import org.springframework.cloud.function.registry.FunctionCatalog; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.binder.Binder; @@ -46,17 +47,13 @@ import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.ConfigurationCondition; import org.springframework.context.annotation.Lazy; import org.springframework.core.type.AnnotatedTypeMetadata; -import org.springframework.util.Assert; -import org.springframework.util.StringUtils; - -import reactor.core.publisher.Flux; /** * @author Mark Fisher * @author Marius Bogoevici */ @EnableConfigurationProperties(StreamConfigurationProperties.class) -@ConditionalOnClass({ Binder.class, AbstractFunctionInvoker.class }) +@ConditionalOnClass({ Binder.class }) @ConditionalOnProperty(name = "spring.cloud.stream.enabled", havingValue = "true", matchIfMissing = true) public class StreamConfiguration { @@ -64,16 +61,12 @@ public class StreamConfiguration { @EnableBinding(Source.class) protected static class SupplierConfiguration { - @Autowired - private StreamConfigurationProperties properties; - @Bean - @ConditionalOnProperty("spring.cloud.stream.bindings.output.destination") - public SupplierInvokingMessageProducer invoker(FunctionCatalog registry) { - String name = properties.getEndpoint(); - long interval = properties.getInterval(); - Supplier> supplier = registry.lookupSupplier(name); - return new SupplierInvokingMessageProducer(supplier, interval); + public SupplierInvokingMessageProducer supplierInvoker( + ListableBeanFactory beanFactory, FunctionCatalog registry) { + String[] names = beanFactory.getBeanNamesForType(Supplier.class, false, + false); + return new SupplierInvokingMessageProducer(registry, names); } } @@ -85,14 +78,14 @@ public class StreamConfiguration { private StreamConfigurationProperties properties; @Bean - @ConditionalOnProperty("spring.cloud.stream.bindings.input.destination") - public AbstractFunctionInvoker invoker(FunctionCatalog registry, FunctionInspector functionInspector, + public StreamListeningFunctionInvoker functionInvoker( + ListableBeanFactory beanFactory, FunctionCatalog registry, + FunctionInspector functionInspector, @Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) { - String name = properties.getEndpoint(); - Function, Flux> function = registry.lookupFunction(name); - Assert.notNull(function, "no such function: " + name); - return new StreamListeningFunctionInvoker(name, function, functionInspector, - compositeMessageConverterFactory); + String[] names = beanFactory.getBeanNamesForType(Function.class, false, + false); + return new StreamListeningFunctionInvoker(registry, functionInspector, + compositeMessageConverterFactory, properties.getEndpoint(), names); } } @@ -104,13 +97,14 @@ public class StreamConfiguration { private StreamConfigurationProperties properties; @Bean - @ConditionalOnProperty("spring.cloud.stream.bindings.input.destination") - public StreamListeningConsumerInvoker invoker(FunctionCatalog registry, FunctionInspector functionInspector, + public StreamListeningConsumerInvoker consumerInvoker( + ListableBeanFactory beanFactory, FunctionCatalog registry, + FunctionInspector functionInspector, @Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) { - String name = properties.getEndpoint(); - Consumer> consumer = registry.lookupConsumer(name); - return new StreamListeningConsumerInvoker(name, consumer, functionInspector, - compositeMessageConverterFactory); + String[] names = beanFactory.getBeanNamesForType(Consumer.class, false, + false); + return new StreamListeningConsumerInvoker(registry, functionInspector, + compositeMessageConverterFactory, properties.getEndpoint(), names); } } @@ -145,21 +139,24 @@ public class StreamConfiguration { } @Override - public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) { - String functionName = context.getEnvironment().getProperty("spring.cloud.function.stream.endpoint"); - if (!StringUtils.hasText(functionName)) { - return ConditionOutcome.noMatch("no endpoint function name available"); + public ConditionOutcome getMatchOutcome(ConditionContext context, + AnnotatedTypeMetadata metadata) { + if (context.getBeanFactory().getBeanNamesForType(type, false, + false).length > 0) { + String endpoint = new RelaxedPropertyResolver(context.getEnvironment(), + "spring.cloud.function.stream.").getProperty("endpoint"); + if (endpoint != null && !type + .isAssignableFrom(context.getBeanFactory().getType(endpoint))) { + return ConditionOutcome.noMatch(String.format( + "explicit endpoint of type other than %s detected", type)); + } + return ConditionOutcome + .match(String.format("bean of type %s detected", type)); + } - if (functionName.indexOf(',') != -1) { - // for now we will just check the first, but later may support: - // supplier[,function]+ or [function,]+consumer - functionName = functionName.substring(0, functionName.indexOf(',')); - } - Class beanType = context.getBeanFactory().getType(functionName); - if (type.isAssignableFrom(beanType)) { - return ConditionOutcome.match(String.format("bean '%s' is a %s", functionName, type)); - } - return ConditionOutcome.noMatch(String.format("bean '%s' is not a %s", functionName, type)); + return ConditionOutcome + .noMatch(String.format("no bean of type %s detected", type)); + } @Override diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java index 99fda44d6..291f25125 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java @@ -16,64 +16,88 @@ package org.springframework.cloud.function.stream; -import java.util.function.Consumer; import java.util.function.Function; -import reactor.core.publisher.Flux; - import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.cloud.function.context.FunctionInspector; +import org.springframework.cloud.function.registry.FunctionCatalog; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory; -import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.messaging.converter.MessageConverter; +import reactor.core.publisher.Flux; + /** * @author Mark Fisher * @author Marius Bogoevici */ public class StreamListeningConsumerInvoker implements SmartInitializingSingleton { - private final Consumer> consumer; - - private final String name; - private final FunctionInspector functionInspector; private final CompositeMessageConverterFactory converterFactory; private MessageConverter converter; - private Class inputType; + private final FunctionCatalog functionCatalog; - public StreamListeningConsumerInvoker(String name, Consumer> consumer, FunctionInspector functionInspector, - CompositeMessageConverterFactory converterFactory) { - this.consumer = consumer; - this.name = name; + private final String defaultEndpoint; + + private final String[] names; + + public StreamListeningConsumerInvoker(FunctionCatalog functionCatalog, + FunctionInspector functionInspector, + CompositeMessageConverterFactory converterFactory, String defaultEndpoint, + String... names) { + this.functionCatalog = functionCatalog; this.functionInspector = functionInspector; this.converterFactory = converterFactory; + this.defaultEndpoint = defaultEndpoint; + this.names = names; } @Override public void afterSingletonsInstantiated() { this.converter = this.converterFactory.getMessageConverterForAllRegistered(); - this.inputType = this.functionInspector.getInputType(this.name); } @StreamListener - public void handle(@Input(Processor.INPUT) Flux> input) { - this.consumer.accept(input.map(convertInput())); + public void handle(@Input(Sink.INPUT) Flux> input) { + input.groupBy(this::select) + .filter(group -> functionCatalog.lookupConsumer(group.key()) != null) + .subscribe(group -> process(group.key(), group)); } - private Function, Object> convertInput() { + private void process(String name, Flux> flux) { + functionCatalog.lookupConsumer(name) + .accept(flux.map(message -> convertInput(name).apply(message))); + } + + private String select(Message input) { + String name = defaultEndpoint; + if (name == null) { + for (String candidate : names) { + Class inputType = functionInspector.getInputType(candidate); + if (this.converter.fromMessage(input, inputType) != null) { + name = candidate; + break; + } + } + } + return name; + } + + private Function, Object> convertInput(String name) { + Class inputType = functionInspector.getInputType(name); return m -> { - if (this.inputType.isAssignableFrom(m.getPayload().getClass())) { + if (inputType.isAssignableFrom(m.getPayload().getClass())) { return m.getPayload(); } else { - return converter.fromMessage(m, this.inputType); + return this.converter.fromMessage(m, inputType); } }; } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java index 5b4ddf93b..417bfe175 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java @@ -18,11 +18,9 @@ package org.springframework.cloud.function.stream; import java.util.function.Function; -import reactor.core.publisher.Flux; - import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.cloud.function.context.FunctionInspector; -import org.springframework.cloud.function.invoker.AbstractFunctionInvoker; +import org.springframework.cloud.function.registry.FunctionCatalog; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.StreamListener; @@ -30,51 +28,79 @@ import org.springframework.cloud.stream.converter.CompositeMessageConverterFacto import org.springframework.cloud.stream.messaging.Processor; import org.springframework.messaging.Message; import org.springframework.messaging.converter.MessageConverter; +import org.springframework.util.Assert; + +import reactor.core.publisher.Flux; /** * @author Mark Fisher * @author Marius Bogoevici */ -public class StreamListeningFunctionInvoker extends AbstractFunctionInvoker, Flux> - implements SmartInitializingSingleton { - - private final String name; +public class StreamListeningFunctionInvoker implements SmartInitializingSingleton { private final FunctionInspector functionInspector; + private final FunctionCatalog functionCatalog; + private final CompositeMessageConverterFactory converterFactory; private MessageConverter converter; - private Class inputType; + private final String defaultEndpoint; - public StreamListeningFunctionInvoker(String name, Function, Flux> function, FunctionInspector functionInspector, - CompositeMessageConverterFactory converterFactory) { - super(function); - this.name = name; + private final String[] names; + + public StreamListeningFunctionInvoker(FunctionCatalog functionCatalog, + FunctionInspector functionInspector, + CompositeMessageConverterFactory converterFactory, String defaultEndpoint, + String... names) { + this.functionCatalog = functionCatalog; this.functionInspector = functionInspector; this.converterFactory = converterFactory; + this.defaultEndpoint = defaultEndpoint; + this.names = names; } @Override public void afterSingletonsInstantiated() { this.converter = this.converterFactory.getMessageConverterForAllRegistered(); - this.inputType = this.functionInspector.getInputType(this.name); } @StreamListener @Output(Processor.OUTPUT) public Flux handle(@Input(Processor.INPUT) Flux> input) { - return this.doInvoke(input.map(convertInput())); + return input.groupBy(this::select) + .filter(group -> functionCatalog.lookupFunction(group.key()) != null) + .flatMap(group -> process(group.key(), group)); } - private Function, Object> convertInput() { + private Flux process(String name, Flux> flux) { + return (Flux) functionCatalog.lookupFunction(name) + .apply(flux.map(message -> convertInput(name).apply(message))); + } + + private String select(Message input) { + String name = defaultEndpoint; + if (name == null) { + for (String candidate : names) { + Class inputType = functionInspector.getInputType(candidate); + if (this.converter.fromMessage(input, inputType) != null) { + name = candidate; + break; + } + } + } + return name; + } + + private Function, Object> convertInput(String name) { + Class inputType = functionInspector.getInputType(name); return m -> { - if (this.inputType.isAssignableFrom(m.getPayload().getClass())) { + if (inputType.isAssignableFrom(m.getPayload().getClass())) { return m.getPayload(); } else { - return this.converter.fromMessage(m, this.inputType); + return this.converter.fromMessage(m, inputType); } }; } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java index 7a6329143..8ff92e03e 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java @@ -16,11 +16,9 @@ package org.springframework.cloud.function.stream; -import java.time.Duration; import java.util.function.Supplier; -import org.springframework.cloud.function.support.FluxSupplier; -import org.springframework.cloud.function.support.FunctionUtils; +import org.springframework.cloud.function.registry.FunctionCatalog; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.messaging.support.MessageBuilder; @@ -33,24 +31,30 @@ import reactor.core.publisher.Flux; */ public class SupplierInvokingMessageProducer extends MessageProducerSupport { - private final Supplier> supplier; + private final FunctionCatalog functionCatalog; - public SupplierInvokingMessageProducer(Supplier supplier, long interval) { - Assert.notNull(supplier, "Supplier must not be null"); - if (!FunctionUtils.isFluxSupplier(supplier)) { - supplier = (interval > 0) - ? new FluxSupplier<>(supplier, Duration.ofMillis(interval)) - : new FluxSupplier<>(supplier); - } - @SuppressWarnings("unchecked") - Supplier> unchecked = (Supplier>) supplier; - this.supplier = unchecked; + private final String[] names; + + public SupplierInvokingMessageProducer(FunctionCatalog registry, String... names) { + this.functionCatalog = registry; + this.names = names; this.setOutputChannelName(Source.OUTPUT); } @Override protected void doStart() { - this.supplier.get() + supplier() .subscribe(m -> this.sendMessage(MessageBuilder.withPayload(m).build())); } + + private Flux supplier() { + Supplier> supplier = null; + Flux result = Flux.empty(); + for (String name : names) { + supplier = functionCatalog.lookupSupplier(name); + Assert.notNull(supplier, "Supplier must not be null"); + result = Flux.merge(result, supplier.get()); + } + return result; + } } diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxPojoStreamingConsumerTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxPojoStreamingConsumerTests.java index f4ef80798..0d8b82be5 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxPojoStreamingConsumerTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxPojoStreamingConsumerTests.java @@ -39,9 +39,7 @@ import reactor.core.publisher.Flux; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = FluxPojoStreamingConsumerTests.StreamingSinkTest.class, properties = { - "spring.cloud.stream.bindings.input.destination=data-in", - "spring.cloud.function.stream.endpoint=sinkConsumer" }) +@SpringBootTest(classes = FluxPojoStreamingConsumerTests.StreamingSinkTest.class) public class FluxPojoStreamingConsumerTests { @Autowired diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxStreamingConsumerTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxStreamingConsumerTests.java index a774f354f..86b099571 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxStreamingConsumerTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxStreamingConsumerTests.java @@ -39,9 +39,7 @@ import reactor.core.publisher.Flux; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = FluxStreamingConsumerTests.StreamingSinkTest.class, properties = { - "spring.cloud.stream.bindings.input.destination=data-in", - "spring.cloud.function.stream.endpoint=sinkConsumer" }) +@SpringBootTest(classes = FluxStreamingConsumerTests.StreamingSinkTest.class) public class FluxStreamingConsumerTests { @Autowired diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/PojoStreamingConsumerTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/PojoStreamingConsumerTests.java index 1ed498001..511a705f0 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/PojoStreamingConsumerTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/PojoStreamingConsumerTests.java @@ -37,9 +37,7 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = PojoStreamingConsumerTests.StreamingSinkTest.class, properties = { - "spring.cloud.stream.bindings.input.destination=data-in", - "spring.cloud.function.stream.endpoint=sinkConsumer" }) +@SpringBootTest(classes = PojoStreamingConsumerTests.StreamingSinkTest.class) public class PojoStreamingConsumerTests { @Autowired diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/StreamingConsumerTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/StreamingConsumerTests.java index 7564e46ac..2bd1a7abf 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/StreamingConsumerTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/StreamingConsumerTests.java @@ -37,9 +37,7 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = StreamingConsumerTests.StreamingSinkTest.class, properties = { - "spring.cloud.stream.bindings.input.destination=data-in", - "spring.cloud.function.stream.endpoint=sinkConsumer" }) +@SpringBootTest(classes = StreamingConsumerTests.StreamingSinkTest.class) public class StreamingConsumerTests { @Autowired diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionTests.java index 728a21342..b8ac7a7a7 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionTests.java @@ -39,9 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = FluxPojoStreamingFunctionTests.StreamingFunctionApplication.class, properties = { - "spring.cloud.stream.bindings.input.destination=data-in", - "spring.cloud.stream.bindings.output.destination=data-out", "spring.cloud.function.stream.endpoint=uppercase" }) +@SpringBootTest(classes = FluxPojoStreamingFunctionTests.StreamingFunctionApplication.class) public class FluxPojoStreamingFunctionTests { @Autowired diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxStreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxStreamingFunctionTests.java index 902175248..2e283287e 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxStreamingFunctionTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxStreamingFunctionTests.java @@ -39,10 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = FluxStreamingFunctionTests.StreamingFunctionApplication.class, properties = { - "spring.cloud.stream.bindings.input.destination=data-in", - "spring.cloud.stream.bindings.output.destination=data-out", - "spring.cloud.function.stream.endpoint=uppercase" }) +@SpringBootTest(classes = FluxStreamingFunctionTests.StreamingFunctionApplication.class) public class FluxStreamingFunctionTests { @Autowired diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/PojoStreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/PojoStreamingFunctionTests.java index 69d1a32b4..dd1922452 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/PojoStreamingFunctionTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/PojoStreamingFunctionTests.java @@ -38,9 +38,7 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = PojoStreamingFunctionTests.StreamingFunctionApplication.class, properties = { - "spring.cloud.stream.bindings.input.destination=data-in", - "spring.cloud.stream.bindings.output.destination=data-out", "spring.cloud.function.stream.endpoint=uppercase" }) +@SpringBootTest(classes = PojoStreamingFunctionTests.StreamingFunctionApplication.class) public class PojoStreamingFunctionTests { @Autowired diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/StreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/StreamingFunctionTests.java index 552f9e7ce..ca597ee4e 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/StreamingFunctionTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/StreamingFunctionTests.java @@ -38,10 +38,7 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = StreamingFunctionTests.StreamingFunctionApplication.class, properties = { - "spring.cloud.stream.bindings.input.destination=data-in", - "spring.cloud.stream.bindings.output.destination=data-out", - "spring.cloud.function.stream.endpoint=uppercase" }) +@SpringBootTest(classes = StreamingFunctionTests.StreamingFunctionApplication.class) public class StreamingFunctionTests { @Autowired diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingExplicitEndpointTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingExplicitEndpointTests.java new file mode 100644 index 000000000..5340cec94 --- /dev/null +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingExplicitEndpointTests.java @@ -0,0 +1,95 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream.mixed; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Marius Bogoevici + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = PojoStreamingExplicitEndpointTests.StreamingFunctionApplication.class, properties = { + "spring.cloud.function.stream.endpoint=uppercase", + "logging.level.org.springframework.integration=DEBUG", "debug=TRUE" }) +public class PojoStreamingExplicitEndpointTests { + + @Autowired + Processor processor; + + @Autowired + MessageCollector messageCollector; + + @Test + public void test() throws Exception { + processor.input() + .send(MessageBuilder.withPayload("{\"name\":\"hello\"}").build()); + Message result = messageCollector.forChannel(processor.output()).poll(1000, + TimeUnit.MILLISECONDS); + assertThat(result.getPayload()).isInstanceOf(Foo.class); + } + + @SpringBootApplication + public static class StreamingFunctionApplication { + + @Bean + public Function uppercase() { + return f -> new Foo(f.getName().toUpperCase()); + } + + @Bean + public Supplier foos() { + return () -> new Foo("world"); + } + + } + + protected static class Foo { + private String name; + + Foo() { + } + + public Foo(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +} diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingMixedTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingMixedTests.java new file mode 100644 index 000000000..37e237b3b --- /dev/null +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingMixedTests.java @@ -0,0 +1,126 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream.mixed; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Marius Bogoevici + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = PojoStreamingMixedTests.StreamingFunctionApplication.class) +public class PojoStreamingMixedTests { + + @Autowired + Processor processor; + + @Autowired + MessageCollector messageCollector; + + @Autowired + List collector; + + @Test + public void test() throws Exception { + processor.input() + .send(MessageBuilder.withPayload("{\"name\":\"hello\"}").build()); + processor.input() + .send(MessageBuilder.withPayload("{\"name\":\"world\"}").build()); + Message result = messageCollector.forChannel(processor.output()).poll(1000, + TimeUnit.MILLISECONDS); + assertThat(result.getPayload()).isInstanceOf(Foo.class); + // 2 subscribers to the same channel so input messages are applied as round robin + assertThat(collector).hasSize(1); + } + + @SpringBootApplication + public static class StreamingFunctionApplication { + + @Bean + public Function uppercase() { + return f -> new Foo(f.getName().toUpperCase()); + } + + @Bean + public List collector() { + return new ArrayList<>(); + } + + @Bean + public Consumer sink(final List list) { + return s -> list.add(s); + } + + } + + protected static class Foo { + private String name; + + Foo() { + } + + public Foo(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } + + protected static class Bar { + private String name; + + Bar() { + } + + public Bar(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +} diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/supplier/StreamSupplierTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/supplier/StreamSupplierTests.java index af4a14bb1..0ffc11985 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/supplier/StreamSupplierTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/supplier/StreamSupplierTests.java @@ -37,9 +37,7 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = StreamSupplierTests.StreamingFunctionApplication.class, properties = { - "spring.cloud.stream.bindings.output.destination=data-out", - "spring.cloud.function.stream.endpoint=simpleSupplier" }) +@SpringBootTest(classes = StreamSupplierTests.StreamingFunctionApplication.class) public class StreamSupplierTests { @Autowired