diff --git a/spring-cloud-function-stream/pom.xml b/spring-cloud-function-stream/pom.xml index 97f6dc4e2..681112122 100644 --- a/spring-cloud-function-stream/pom.xml +++ b/spring-cloud-function-stream/pom.xml @@ -11,7 +11,6 @@ org.springframework.cloud spring-cloud-function-parent 1.0.0.BUILD-SNAPSHOT - .. @@ -40,7 +39,6 @@ org.springframework.cloud spring-cloud-function-context - test ${project.version} diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java index d9023c9b7..b2ca04ecc 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java @@ -31,10 +31,12 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.SpringBootCondition; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.function.context.FunctionInspector; import org.springframework.cloud.function.invoker.AbstractFunctionInvoker; import org.springframework.cloud.function.registry.FunctionCatalog; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.binder.Binder; +import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.cloud.stream.messaging.Source; @@ -42,6 +44,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ConditionContext; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.ConfigurationCondition; +import org.springframework.context.annotation.Lazy; import org.springframework.core.type.AnnotatedTypeMetadata; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -50,6 +53,7 @@ import reactor.core.publisher.Flux; /** * @author Mark Fisher + * @author Marius Bogoevici */ @EnableConfigurationProperties(StreamConfigurationProperties.class) @ConditionalOnClass({ Binder.class, AbstractFunctionInvoker.class }) @@ -82,11 +86,13 @@ public class StreamConfiguration { @Bean @ConditionalOnProperty("spring.cloud.stream.bindings.input.destination") - public AbstractFunctionInvoker invoker(FunctionCatalog registry) { + public AbstractFunctionInvoker invoker(FunctionCatalog registry, FunctionInspector functionInspector, + @Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) { String name = properties.getEndpoint(); Function function = registry.lookupFunction(name); Assert.notNull(function, "no such function: " + name); - return new StreamListeningFunctionInvoker(function); + return new StreamListeningFunctionInvoker(name, function, functionInspector, + compositeMessageConverterFactory); } } @@ -99,10 +105,12 @@ public class StreamConfiguration { @Bean @ConditionalOnProperty("spring.cloud.stream.bindings.input.destination") - public StreamListeningConsumerInvoker invoker(FunctionCatalog registry) { + public StreamListeningConsumerInvoker invoker(FunctionCatalog registry, FunctionInspector functionInspector, + @Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) { String name = properties.getEndpoint(); - Consumer> consumer = registry.lookupConsumer(name); - return new StreamListeningConsumerInvoker(consumer); + Consumer> consumer = registry.lookupConsumer(name); + return new StreamListeningConsumerInvoker(name, consumer, functionInspector, + compositeMessageConverterFactory); } } @@ -137,10 +145,8 @@ public class StreamConfiguration { } @Override - public ConditionOutcome getMatchOutcome(ConditionContext context, - AnnotatedTypeMetadata metadata) { - String functionName = context.getEnvironment() - .getProperty("spring.cloud.function.stream.endpoint"); + public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) { + String functionName = context.getEnvironment().getProperty("spring.cloud.function.stream.endpoint"); if (!StringUtils.hasText(functionName)) { return ConditionOutcome.noMatch("no endpoint function name available"); } @@ -151,11 +157,9 @@ public class StreamConfiguration { } Class beanType = context.getBeanFactory().getType(functionName); if (type.isAssignableFrom(beanType)) { - return ConditionOutcome - .match(String.format("bean '%s' is a %s", functionName, type)); + return ConditionOutcome.match(String.format("bean '%s' is a %s", functionName, type)); } - return ConditionOutcome - .noMatch(String.format("bean '%s' is not a %s", functionName, type)); + return ConditionOutcome.noMatch(String.format("bean '%s' is not a %s", functionName, type)); } @Override diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java index e695eeb4b..99fda44d6 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java @@ -17,26 +17,64 @@ package org.springframework.cloud.function.stream; import java.util.function.Consumer; - -import org.springframework.cloud.stream.annotation.Input; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.messaging.Processor; +import java.util.function.Function; import reactor.core.publisher.Flux; +import org.springframework.beans.factory.SmartInitializingSingleton; +import org.springframework.cloud.function.context.FunctionInspector; +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.messaging.Message; +import org.springframework.messaging.converter.MessageConverter; + /** * @author Mark Fisher + * @author Marius Bogoevici */ -public class StreamListeningConsumerInvoker { +public class StreamListeningConsumerInvoker implements SmartInitializingSingleton { - private final Consumer> consumer; + private final Consumer> consumer; - public StreamListeningConsumerInvoker(Consumer> consumer) { + private final String name; + + private final FunctionInspector functionInspector; + + private final CompositeMessageConverterFactory converterFactory; + + private MessageConverter converter; + + private Class inputType; + + public StreamListeningConsumerInvoker(String name, Consumer> consumer, FunctionInspector functionInspector, + CompositeMessageConverterFactory converterFactory) { this.consumer = consumer; + this.name = name; + this.functionInspector = functionInspector; + this.converterFactory = converterFactory; + } + + @Override + public void afterSingletonsInstantiated() { + this.converter = this.converterFactory.getMessageConverterForAllRegistered(); + this.inputType = this.functionInspector.getInputType(this.name); } @StreamListener - public void handle(@Input(Processor.INPUT) Flux input) { - this.consumer.accept(input); + public void handle(@Input(Processor.INPUT) Flux> input) { + this.consumer.accept(input.map(convertInput())); + } + + private Function, Object> convertInput() { + return m -> { + if (this.inputType.isAssignableFrom(m.getPayload().getClass())) { + return m.getPayload(); + } + else { + return converter.fromMessage(m, this.inputType); + } + }; } } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java index acfbaf118..75b8c8411 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java @@ -18,31 +18,57 @@ package org.springframework.cloud.function.stream; import java.util.function.Function; +import reactor.core.publisher.Flux; + +import org.springframework.beans.factory.SmartInitializingSingleton; +import org.springframework.cloud.function.context.FunctionInspector; import org.springframework.cloud.function.invoker.AbstractFunctionInvoker; import org.springframework.cloud.function.support.FluxFunction; import org.springframework.cloud.function.support.FunctionUtils; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory; import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.messaging.Message; +import org.springframework.messaging.converter.MessageConverter; import org.springframework.util.Assert; -import reactor.core.publisher.Flux; - /** * @author Mark Fisher + * @author Marius Bogoevici */ -public class StreamListeningFunctionInvoker - extends AbstractFunctionInvoker, Flux> { +public class StreamListeningFunctionInvoker extends AbstractFunctionInvoker, Flux> + implements SmartInitializingSingleton { - public StreamListeningFunctionInvoker(Function function) { + private final String name; + + private final FunctionInspector functionInspector; + + private final CompositeMessageConverterFactory converterFactory; + + private MessageConverter converter; + + private Class inputType; + + public StreamListeningFunctionInvoker(String name, Function function, FunctionInspector functionInspector, + CompositeMessageConverterFactory converterFactory) { super(wrapIfNecessary(function)); + this.name = name; + this.functionInspector = functionInspector; + this.converterFactory = converterFactory; + } + + @Override + public void afterSingletonsInstantiated() { + this.converter = this.converterFactory.getMessageConverterForAllRegistered(); + this.inputType = this.functionInspector.getInputType(this.name); } @StreamListener @Output(Processor.OUTPUT) - public Flux handle(@Input(Processor.INPUT) Flux input) { - return this.doInvoke(input); + public Flux handle(@Input(Processor.INPUT) Flux> input) { + return this.doInvoke(input.map(convertInput())); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -53,4 +79,15 @@ public class StreamListeningFunctionInvoker } return function; } + + private Function, Object> convertInput() { + return m -> { + if (this.inputType.isAssignableFrom(m.getPayload().getClass())) { + return m.getPayload(); + } + else { + return this.converter.fromMessage(m, this.inputType); + } + }; + } } diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/PojoStreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/PojoStreamingFunctionTests.java new file mode 100644 index 000000000..69d1a32b4 --- /dev/null +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/PojoStreamingFunctionTests.java @@ -0,0 +1,86 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream.function; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Marius Bogoevici + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = PojoStreamingFunctionTests.StreamingFunctionApplication.class, properties = { + "spring.cloud.stream.bindings.input.destination=data-in", + "spring.cloud.stream.bindings.output.destination=data-out", "spring.cloud.function.stream.endpoint=uppercase" }) +public class PojoStreamingFunctionTests { + + @Autowired + Processor processor; + + @Autowired + MessageCollector messageCollector; + + @Test + public void test() throws Exception { + processor.input().send(MessageBuilder.withPayload(new String("{\"name\":\"foo\"}")).build()); + Message result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS); + assertThat(result.getPayload()).isInstanceOf(Foo.class); + } + + @SpringBootApplication + public static class StreamingFunctionApplication { + + @Bean + public Function uppercase() { + return f -> new Foo(f.getName().toUpperCase()); + } + } + + protected static class Foo { + private String name; + + Foo() { + } + + public Foo(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +}