Make spring.cloud.function.stream.endpoint optional for stream apps

If there is only one function you shouldn't have to set any configuration
to get a stream app to run. This also implementation supports multiple
functions, trying to guess which one to use based on the type of
the incoming message payload. In principle that could be strategized
as a simple router function (e.g. to look for a header with a function
name).

If there are functions and consumers in the same app, they will
subscribe to the same input channel (and hence by default Spring
Integration will load balance between them). This could also probably
use some more features, to specify the desired behaviour.

If user *does* supply spring.cloud.function.stream.endpoint then it
is used and overrides all other possible routes.
This commit is contained in:
Dave Syer
2017-05-31 08:52:16 +01:00
committed by markfisher
parent 9d0d04f4f6
commit d641aae494
15 changed files with 374 additions and 122 deletions

View File

@@ -25,14 +25,15 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionOutcome;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
import org.springframework.boot.bind.RelaxedPropertyResolver;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.function.context.FunctionInspector;
import org.springframework.cloud.function.invoker.AbstractFunctionInvoker;
import org.springframework.cloud.function.registry.FunctionCatalog;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binder.Binder;
@@ -46,17 +47,13 @@ import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.ConfigurationCondition;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
/**
* @author Mark Fisher
* @author Marius Bogoevici
*/
@EnableConfigurationProperties(StreamConfigurationProperties.class)
@ConditionalOnClass({ Binder.class, AbstractFunctionInvoker.class })
@ConditionalOnClass({ Binder.class })
@ConditionalOnProperty(name = "spring.cloud.stream.enabled", havingValue = "true", matchIfMissing = true)
public class StreamConfiguration {
@@ -64,16 +61,12 @@ public class StreamConfiguration {
@EnableBinding(Source.class)
protected static class SupplierConfiguration {
@Autowired
private StreamConfigurationProperties properties;
@Bean
@ConditionalOnProperty("spring.cloud.stream.bindings.output.destination")
public SupplierInvokingMessageProducer<Object> invoker(FunctionCatalog registry) {
String name = properties.getEndpoint();
long interval = properties.getInterval();
Supplier<Flux<Object>> supplier = registry.lookupSupplier(name);
return new SupplierInvokingMessageProducer<Object>(supplier, interval);
public SupplierInvokingMessageProducer<Object> supplierInvoker(
ListableBeanFactory beanFactory, FunctionCatalog registry) {
String[] names = beanFactory.getBeanNamesForType(Supplier.class, false,
false);
return new SupplierInvokingMessageProducer<Object>(registry, names);
}
}
@@ -85,14 +78,14 @@ public class StreamConfiguration {
private StreamConfigurationProperties properties;
@Bean
@ConditionalOnProperty("spring.cloud.stream.bindings.input.destination")
public AbstractFunctionInvoker<?, ?> invoker(FunctionCatalog registry, FunctionInspector functionInspector,
public StreamListeningFunctionInvoker functionInvoker(
ListableBeanFactory beanFactory, FunctionCatalog registry,
FunctionInspector functionInspector,
@Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) {
String name = properties.getEndpoint();
Function<Flux<?>, Flux<?>> function = registry.lookupFunction(name);
Assert.notNull(function, "no such function: " + name);
return new StreamListeningFunctionInvoker(name, function, functionInspector,
compositeMessageConverterFactory);
String[] names = beanFactory.getBeanNamesForType(Function.class, false,
false);
return new StreamListeningFunctionInvoker(registry, functionInspector,
compositeMessageConverterFactory, properties.getEndpoint(), names);
}
}
@@ -104,13 +97,14 @@ public class StreamConfiguration {
private StreamConfigurationProperties properties;
@Bean
@ConditionalOnProperty("spring.cloud.stream.bindings.input.destination")
public StreamListeningConsumerInvoker invoker(FunctionCatalog registry, FunctionInspector functionInspector,
public StreamListeningConsumerInvoker consumerInvoker(
ListableBeanFactory beanFactory, FunctionCatalog registry,
FunctionInspector functionInspector,
@Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) {
String name = properties.getEndpoint();
Consumer<Flux<?>> consumer = registry.lookupConsumer(name);
return new StreamListeningConsumerInvoker(name, consumer, functionInspector,
compositeMessageConverterFactory);
String[] names = beanFactory.getBeanNamesForType(Consumer.class, false,
false);
return new StreamListeningConsumerInvoker(registry, functionInspector,
compositeMessageConverterFactory, properties.getEndpoint(), names);
}
}
@@ -145,21 +139,24 @@ public class StreamConfiguration {
}
@Override
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) {
String functionName = context.getEnvironment().getProperty("spring.cloud.function.stream.endpoint");
if (!StringUtils.hasText(functionName)) {
return ConditionOutcome.noMatch("no endpoint function name available");
public ConditionOutcome getMatchOutcome(ConditionContext context,
AnnotatedTypeMetadata metadata) {
if (context.getBeanFactory().getBeanNamesForType(type, false,
false).length > 0) {
String endpoint = new RelaxedPropertyResolver(context.getEnvironment(),
"spring.cloud.function.stream.").getProperty("endpoint");
if (endpoint != null && !type
.isAssignableFrom(context.getBeanFactory().getType(endpoint))) {
return ConditionOutcome.noMatch(String.format(
"explicit endpoint of type other than %s detected", type));
}
return ConditionOutcome
.match(String.format("bean of type %s detected", type));
}
if (functionName.indexOf(',') != -1) {
// for now we will just check the first, but later may support:
// supplier[,function]+ or [function,]+consumer
functionName = functionName.substring(0, functionName.indexOf(','));
}
Class<?> beanType = context.getBeanFactory().getType(functionName);
if (type.isAssignableFrom(beanType)) {
return ConditionOutcome.match(String.format("bean '%s' is a %s", functionName, type));
}
return ConditionOutcome.noMatch(String.format("bean '%s' is not a %s", functionName, type));
return ConditionOutcome
.noMatch(String.format("no bean of type %s detected", type));
}
@Override

View File

@@ -16,64 +16,88 @@
package org.springframework.cloud.function.stream;
import java.util.function.Consumer;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.cloud.function.context.FunctionInspector;
import org.springframework.cloud.function.registry.FunctionCatalog;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import reactor.core.publisher.Flux;
/**
* @author Mark Fisher
* @author Marius Bogoevici
*/
public class StreamListeningConsumerInvoker implements SmartInitializingSingleton {
private final Consumer<Flux<?>> consumer;
private final String name;
private final FunctionInspector functionInspector;
private final CompositeMessageConverterFactory converterFactory;
private MessageConverter converter;
private Class<?> inputType;
private final FunctionCatalog functionCatalog;
public StreamListeningConsumerInvoker(String name, Consumer<Flux<?>> consumer, FunctionInspector functionInspector,
CompositeMessageConverterFactory converterFactory) {
this.consumer = consumer;
this.name = name;
private final String defaultEndpoint;
private final String[] names;
public StreamListeningConsumerInvoker(FunctionCatalog functionCatalog,
FunctionInspector functionInspector,
CompositeMessageConverterFactory converterFactory, String defaultEndpoint,
String... names) {
this.functionCatalog = functionCatalog;
this.functionInspector = functionInspector;
this.converterFactory = converterFactory;
this.defaultEndpoint = defaultEndpoint;
this.names = names;
}
@Override
public void afterSingletonsInstantiated() {
this.converter = this.converterFactory.getMessageConverterForAllRegistered();
this.inputType = this.functionInspector.getInputType(this.name);
}
@StreamListener
public void handle(@Input(Processor.INPUT) Flux<Message<?>> input) {
this.consumer.accept(input.map(convertInput()));
public void handle(@Input(Sink.INPUT) Flux<Message<?>> input) {
input.groupBy(this::select)
.filter(group -> functionCatalog.lookupConsumer(group.key()) != null)
.subscribe(group -> process(group.key(), group));
}
private Function<Message<?>, Object> convertInput() {
private void process(String name, Flux<Message<?>> flux) {
functionCatalog.lookupConsumer(name)
.accept(flux.map(message -> convertInput(name).apply(message)));
}
private String select(Message<?> input) {
String name = defaultEndpoint;
if (name == null) {
for (String candidate : names) {
Class<?> inputType = functionInspector.getInputType(candidate);
if (this.converter.fromMessage(input, inputType) != null) {
name = candidate;
break;
}
}
}
return name;
}
private Function<Message<?>, Object> convertInput(String name) {
Class<?> inputType = functionInspector.getInputType(name);
return m -> {
if (this.inputType.isAssignableFrom(m.getPayload().getClass())) {
if (inputType.isAssignableFrom(m.getPayload().getClass())) {
return m.getPayload();
}
else {
return converter.fromMessage(m, this.inputType);
return this.converter.fromMessage(m, inputType);
}
};
}

View File

@@ -18,11 +18,9 @@ package org.springframework.cloud.function.stream;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.cloud.function.context.FunctionInspector;
import org.springframework.cloud.function.invoker.AbstractFunctionInvoker;
import org.springframework.cloud.function.registry.FunctionCatalog;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
@@ -30,51 +28,79 @@ import org.springframework.cloud.stream.converter.CompositeMessageConverterFacto
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
/**
* @author Mark Fisher
* @author Marius Bogoevici
*/
public class StreamListeningFunctionInvoker extends AbstractFunctionInvoker<Flux<?>, Flux<?>>
implements SmartInitializingSingleton {
private final String name;
public class StreamListeningFunctionInvoker implements SmartInitializingSingleton {
private final FunctionInspector functionInspector;
private final FunctionCatalog functionCatalog;
private final CompositeMessageConverterFactory converterFactory;
private MessageConverter converter;
private Class<?> inputType;
private final String defaultEndpoint;
public StreamListeningFunctionInvoker(String name, Function<Flux<?>, Flux<?>> function, FunctionInspector functionInspector,
CompositeMessageConverterFactory converterFactory) {
super(function);
this.name = name;
private final String[] names;
public StreamListeningFunctionInvoker(FunctionCatalog functionCatalog,
FunctionInspector functionInspector,
CompositeMessageConverterFactory converterFactory, String defaultEndpoint,
String... names) {
this.functionCatalog = functionCatalog;
this.functionInspector = functionInspector;
this.converterFactory = converterFactory;
this.defaultEndpoint = defaultEndpoint;
this.names = names;
}
@Override
public void afterSingletonsInstantiated() {
this.converter = this.converterFactory.getMessageConverterForAllRegistered();
this.inputType = this.functionInspector.getInputType(this.name);
}
@StreamListener
@Output(Processor.OUTPUT)
public Flux<?> handle(@Input(Processor.INPUT) Flux<Message<?>> input) {
return this.doInvoke(input.map(convertInput()));
return input.groupBy(this::select)
.filter(group -> functionCatalog.lookupFunction(group.key()) != null)
.flatMap(group -> process(group.key(), group));
}
private Function<Message<?>, Object> convertInput() {
private Flux<?> process(String name, Flux<Message<?>> flux) {
return (Flux<?>) functionCatalog.lookupFunction(name)
.apply(flux.map(message -> convertInput(name).apply(message)));
}
private String select(Message<?> input) {
String name = defaultEndpoint;
if (name == null) {
for (String candidate : names) {
Class<?> inputType = functionInspector.getInputType(candidate);
if (this.converter.fromMessage(input, inputType) != null) {
name = candidate;
break;
}
}
}
return name;
}
private Function<Message<?>, Object> convertInput(String name) {
Class<?> inputType = functionInspector.getInputType(name);
return m -> {
if (this.inputType.isAssignableFrom(m.getPayload().getClass())) {
if (inputType.isAssignableFrom(m.getPayload().getClass())) {
return m.getPayload();
}
else {
return this.converter.fromMessage(m, this.inputType);
return this.converter.fromMessage(m, inputType);
}
};
}

View File

@@ -16,11 +16,9 @@
package org.springframework.cloud.function.stream;
import java.time.Duration;
import java.util.function.Supplier;
import org.springframework.cloud.function.support.FluxSupplier;
import org.springframework.cloud.function.support.FunctionUtils;
import org.springframework.cloud.function.registry.FunctionCatalog;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.support.MessageBuilder;
@@ -33,24 +31,30 @@ import reactor.core.publisher.Flux;
*/
public class SupplierInvokingMessageProducer<T> extends MessageProducerSupport {
private final Supplier<Flux<T>> supplier;
private final FunctionCatalog functionCatalog;
public SupplierInvokingMessageProducer(Supplier<?> supplier, long interval) {
Assert.notNull(supplier, "Supplier must not be null");
if (!FunctionUtils.isFluxSupplier(supplier)) {
supplier = (interval > 0)
? new FluxSupplier<>(supplier, Duration.ofMillis(interval))
: new FluxSupplier<>(supplier);
}
@SuppressWarnings("unchecked")
Supplier<Flux<T>> unchecked = (Supplier<Flux<T>>) supplier;
this.supplier = unchecked;
private final String[] names;
public SupplierInvokingMessageProducer(FunctionCatalog registry, String... names) {
this.functionCatalog = registry;
this.names = names;
this.setOutputChannelName(Source.OUTPUT);
}
@Override
protected void doStart() {
this.supplier.get()
supplier()
.subscribe(m -> this.sendMessage(MessageBuilder.withPayload(m).build()));
}
private Flux<?> supplier() {
Supplier<Flux<?>> supplier = null;
Flux<?> result = Flux.empty();
for (String name : names) {
supplier = functionCatalog.lookupSupplier(name);
Assert.notNull(supplier, "Supplier must not be null");
result = Flux.merge(result, supplier.get());
}
return result;
}
}

View File

@@ -39,9 +39,7 @@ import reactor.core.publisher.Flux;
* @author Marius Bogoevici
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = FluxPojoStreamingConsumerTests.StreamingSinkTest.class, properties = {
"spring.cloud.stream.bindings.input.destination=data-in",
"spring.cloud.function.stream.endpoint=sinkConsumer" })
@SpringBootTest(classes = FluxPojoStreamingConsumerTests.StreamingSinkTest.class)
public class FluxPojoStreamingConsumerTests {
@Autowired

View File

@@ -39,9 +39,7 @@ import reactor.core.publisher.Flux;
* @author Marius Bogoevici
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = FluxStreamingConsumerTests.StreamingSinkTest.class, properties = {
"spring.cloud.stream.bindings.input.destination=data-in",
"spring.cloud.function.stream.endpoint=sinkConsumer" })
@SpringBootTest(classes = FluxStreamingConsumerTests.StreamingSinkTest.class)
public class FluxStreamingConsumerTests {
@Autowired

View File

@@ -37,9 +37,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Marius Bogoevici
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = PojoStreamingConsumerTests.StreamingSinkTest.class, properties = {
"spring.cloud.stream.bindings.input.destination=data-in",
"spring.cloud.function.stream.endpoint=sinkConsumer" })
@SpringBootTest(classes = PojoStreamingConsumerTests.StreamingSinkTest.class)
public class PojoStreamingConsumerTests {
@Autowired

View File

@@ -37,9 +37,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Marius Bogoevici
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = StreamingConsumerTests.StreamingSinkTest.class, properties = {
"spring.cloud.stream.bindings.input.destination=data-in",
"spring.cloud.function.stream.endpoint=sinkConsumer" })
@SpringBootTest(classes = StreamingConsumerTests.StreamingSinkTest.class)
public class StreamingConsumerTests {
@Autowired

View File

@@ -39,9 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Marius Bogoevici
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = FluxPojoStreamingFunctionTests.StreamingFunctionApplication.class, properties = {
"spring.cloud.stream.bindings.input.destination=data-in",
"spring.cloud.stream.bindings.output.destination=data-out", "spring.cloud.function.stream.endpoint=uppercase" })
@SpringBootTest(classes = FluxPojoStreamingFunctionTests.StreamingFunctionApplication.class)
public class FluxPojoStreamingFunctionTests {
@Autowired

View File

@@ -39,10 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Marius Bogoevici
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = FluxStreamingFunctionTests.StreamingFunctionApplication.class, properties = {
"spring.cloud.stream.bindings.input.destination=data-in",
"spring.cloud.stream.bindings.output.destination=data-out",
"spring.cloud.function.stream.endpoint=uppercase" })
@SpringBootTest(classes = FluxStreamingFunctionTests.StreamingFunctionApplication.class)
public class FluxStreamingFunctionTests {
@Autowired

View File

@@ -38,9 +38,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Marius Bogoevici
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = PojoStreamingFunctionTests.StreamingFunctionApplication.class, properties = {
"spring.cloud.stream.bindings.input.destination=data-in",
"spring.cloud.stream.bindings.output.destination=data-out", "spring.cloud.function.stream.endpoint=uppercase" })
@SpringBootTest(classes = PojoStreamingFunctionTests.StreamingFunctionApplication.class)
public class PojoStreamingFunctionTests {
@Autowired

View File

@@ -38,10 +38,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Marius Bogoevici
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = StreamingFunctionTests.StreamingFunctionApplication.class, properties = {
"spring.cloud.stream.bindings.input.destination=data-in",
"spring.cloud.stream.bindings.output.destination=data-out",
"spring.cloud.function.stream.endpoint=uppercase" })
@SpringBootTest(classes = StreamingFunctionTests.StreamingFunctionApplication.class)
public class StreamingFunctionTests {
@Autowired

View File

@@ -0,0 +1,95 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.stream.mixed;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Marius Bogoevici
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = PojoStreamingExplicitEndpointTests.StreamingFunctionApplication.class, properties = {
"spring.cloud.function.stream.endpoint=uppercase",
"logging.level.org.springframework.integration=DEBUG", "debug=TRUE" })
public class PojoStreamingExplicitEndpointTests {
@Autowired
Processor processor;
@Autowired
MessageCollector messageCollector;
@Test
public void test() throws Exception {
processor.input()
.send(MessageBuilder.withPayload("{\"name\":\"hello\"}").build());
Message<?> result = messageCollector.forChannel(processor.output()).poll(1000,
TimeUnit.MILLISECONDS);
assertThat(result.getPayload()).isInstanceOf(Foo.class);
}
@SpringBootApplication
public static class StreamingFunctionApplication {
@Bean
public Function<Foo, Foo> uppercase() {
return f -> new Foo(f.getName().toUpperCase());
}
@Bean
public Supplier<Foo> foos() {
return () -> new Foo("world");
}
}
protected static class Foo {
private String name;
Foo() {
}
public Foo(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}

View File

@@ -0,0 +1,126 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.stream.mixed;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Marius Bogoevici
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = PojoStreamingMixedTests.StreamingFunctionApplication.class)
public class PojoStreamingMixedTests {
@Autowired
Processor processor;
@Autowired
MessageCollector messageCollector;
@Autowired
List<Bar> collector;
@Test
public void test() throws Exception {
processor.input()
.send(MessageBuilder.withPayload("{\"name\":\"hello\"}").build());
processor.input()
.send(MessageBuilder.withPayload("{\"name\":\"world\"}").build());
Message<?> result = messageCollector.forChannel(processor.output()).poll(1000,
TimeUnit.MILLISECONDS);
assertThat(result.getPayload()).isInstanceOf(Foo.class);
// 2 subscribers to the same channel so input messages are applied as round robin
assertThat(collector).hasSize(1);
}
@SpringBootApplication
public static class StreamingFunctionApplication {
@Bean
public Function<Foo, Foo> uppercase() {
return f -> new Foo(f.getName().toUpperCase());
}
@Bean
public List<Bar> collector() {
return new ArrayList<>();
}
@Bean
public Consumer<Bar> sink(final List<Bar> list) {
return s -> list.add(s);
}
}
protected static class Foo {
private String name;
Foo() {
}
public Foo(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
protected static class Bar {
private String name;
Bar() {
}
public Bar(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}

View File

@@ -37,9 +37,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Marius Bogoevici
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = StreamSupplierTests.StreamingFunctionApplication.class, properties = {
"spring.cloud.stream.bindings.output.destination=data-out",
"spring.cloud.function.stream.endpoint=simpleSupplier" })
@SpringBootTest(classes = StreamSupplierTests.StreamingFunctionApplication.class)
public class StreamSupplierTests {
@Autowired