From 0756dc3394b5381a16076c66460c3ecb0d10f3d0 Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Thu, 15 Jun 2017 12:32:46 +0100 Subject: [PATCH] Add support for Message in stream apps The FunctionInspector needs to be able to distinguish between a function of Flux and a function of Flux>. Then it can do the conversion a level below. Only supports Message->Message or POJO->POJO (no mixtures), so there is only one new method in FunctionInspector. No support in the web endpoints yet. But it's probably not so hard to add. --- spring-cloud-function-context/pom.xml | 4 + ...ntextFunctionCatalogAutoConfiguration.java | 56 ++++++++++-- .../function/context/FunctionInspector.java | 2 + ...FunctionCatalogAutoConfigurationTests.java | 54 +++++++++-- .../FunctionExtractingFunctionCatalog.java | 12 ++- .../StreamListeningConsumerInvoker.java | 5 +- .../StreamListeningFunctionInvoker.java | 18 +++- ...FluxMessagePojoStreamingFunctionTests.java | 90 +++++++++++++++++++ .../MessagePojoStreamingFunctionTests.java | 87 ++++++++++++++++++ 9 files changed, 307 insertions(+), 21 deletions(-) create mode 100644 spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxMessagePojoStreamingFunctionTests.java create mode 100644 spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/MessagePojoStreamingFunctionTests.java diff --git a/spring-cloud-function-context/pom.xml b/spring-cloud-function-context/pom.xml index fce40e0f3..7ceaad390 100644 --- a/spring-cloud-function-context/pom.xml +++ b/spring-cloud-function-context/pom.xml @@ -28,6 +28,10 @@ org.springframework spring-web + + org.springframework + spring-messaging + com.fasterxml.jackson.core jackson-databind diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java index b72270a87..dd6691b42 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java @@ -62,6 +62,7 @@ import org.springframework.core.convert.support.DefaultConversionService; import org.springframework.core.io.FileSystemResource; import org.springframework.core.type.StandardMethodMetadata; import org.springframework.core.type.classreading.MethodMetadataReadingVisitor; +import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import org.springframework.util.ClassUtils; import org.springframework.util.ReflectionUtils; @@ -108,6 +109,11 @@ public class ContextFunctionCatalogAutoConfiguration { this.processor = processor; } + @Override + public boolean isMessage(String name) { + return processor.isMessage(name); + } + @Override public Class getInputWrapper(String name) { return processor.findInputWrapper(name); @@ -472,14 +478,19 @@ public class ContextFunctionCatalogAutoConfiguration { Type typeArgumentAtIndex = parameterizedType .getActualTypeArguments()[index]; if (typeArgumentAtIndex instanceof ParameterizedType - && FunctionInspector.isWrapper( - ((ParameterizedType) typeArgumentAtIndex).getRawType()) && !paramType.isWrapper()) { - param = ((ParameterizedType) typeArgumentAtIndex) - .getActualTypeArguments()[0]; + if (FunctionInspector.isWrapper( + ((ParameterizedType) typeArgumentAtIndex).getRawType())) { + param = ((ParameterizedType) typeArgumentAtIndex) + .getActualTypeArguments()[0]; + param = extractNestedType(paramType, param); + } + else { + param = extractNestedType(paramType, typeArgumentAtIndex); + } } else { - param = typeArgumentAtIndex; + param = extractNestedType(paramType, typeArgumentAtIndex); } } else { @@ -488,6 +499,16 @@ public class ContextFunctionCatalogAutoConfiguration { return param; } + private Type extractNestedType(ParamType paramType, Type param) { + if (!paramType.isInnerWrapper() + && param.getTypeName().startsWith(Message.class.getName())) { + if (param instanceof ParameterizedType) { + param = ((ParameterizedType) param).getActualTypeArguments()[0]; + } + } + return param; + } + private Object getField(Object target, String name) { Field field = ReflectionUtils.findField(target.getClass(), name); if (field == null) { @@ -497,6 +518,19 @@ public class ContextFunctionCatalogAutoConfiguration { return ReflectionUtils.getField(field, target); } + private boolean isMessage(String name) { + if (name == null || !registry.containsBeanDefinition(name)) { + return false; + } + return Message.class.isAssignableFrom(findType(name, + (AbstractBeanDefinition) registry.getBeanDefinition(name), + ParamType.INPUT_INNER_WRAPPER)) + || Message.class.isAssignableFrom(findType(name, + (AbstractBeanDefinition) registry.getBeanDefinition(name), + ParamType.OUTPUT_INNER_WRAPPER)); + + } + private Class findInputWrapper(String name) { if (name == null || !registry.containsBeanDefinition(name)) { return Object.class; @@ -533,19 +567,25 @@ public class ContextFunctionCatalogAutoConfiguration { } static enum ParamType { - INPUT, OUTPUT, INPUT_WRAPPER, OUTPUT_WRAPPER; + INPUT, OUTPUT, INPUT_WRAPPER, OUTPUT_WRAPPER, INPUT_INNER_WRAPPER, OUTPUT_INNER_WRAPPER; public boolean isOutput() { - return this == OUTPUT || this == OUTPUT_WRAPPER; + return this == OUTPUT || this == OUTPUT_WRAPPER + || this == OUTPUT_INNER_WRAPPER; } public boolean isInput() { - return this == INPUT || this == INPUT_WRAPPER; + return this == INPUT || this == INPUT_WRAPPER + || this == INPUT_INNER_WRAPPER; } public boolean isWrapper() { return this == OUTPUT_WRAPPER || this == INPUT_WRAPPER; } + + public boolean isInnerWrapper() { + return this == OUTPUT_INNER_WRAPPER || this == INPUT_INNER_WRAPPER; + } } } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionInspector.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionInspector.java index e6bb10a42..998a12e05 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionInspector.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionInspector.java @@ -28,6 +28,8 @@ import reactor.core.publisher.Mono; */ public interface FunctionInspector { + boolean isMessage(String name); + Class getInputType(String name); Class getOutputType(String name); diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfigurationTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfigurationTests.java index 12f0d1947..69ecca78b 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfigurationTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfigurationTests.java @@ -39,6 +39,8 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.core.io.FileSystemResource; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.StreamUtils; import static org.assertj.core.api.Assertions.assertThat; @@ -80,6 +82,26 @@ public class ContextFunctionCatalogAutoConfigurationTests { assertThat(inspector.getInputWrapper("function")).isAssignableFrom(Map.class); } + @Test + public void fluxMessageFunction() { + create(FluxMessageConfiguration.class); + assertThat(context.getBean("function")).isInstanceOf(Function.class); + assertThat(catalog.lookupFunction("function")).isInstanceOf(Function.class); + assertThat(inspector.isMessage("function")).isTrue(); + assertThat(inspector.getInputType("function")).isAssignableFrom(String.class); + assertThat(inspector.getInputWrapper("function")).isAssignableFrom(Flux.class); + } + + @Test + public void messageFunction() { + create(MessageConfiguration.class); + assertThat(context.getBean("function")).isInstanceOf(Function.class); + assertThat(catalog.lookupFunction("function")).isInstanceOf(Function.class); + assertThat(inspector.isMessage("function")).isTrue(); + assertThat(inspector.getInputType("function")).isAssignableFrom(String.class); + assertThat(inspector.getInputWrapper("function")).isAssignableFrom(String.class); + } + @Test public void genericFluxFunction() { create(GenericFluxConfiguration.class); @@ -177,13 +199,14 @@ public class ContextFunctionCatalogAutoConfigurationTests { @Test public void compiledConsumer() throws Exception { create(EmptyConfiguration.class, - "spring.cloud.function.compile.foos.lambda=" + getClass().getName() + "::set", + "spring.cloud.function.compile.foos.lambda=" + getClass().getName() + + "::set", "spring.cloud.function.compile.foos.type=consumer", "spring.cloud.function.compile.foos.inputType=String"); assertThat(catalog.lookupConsumer("foos")).isInstanceOf(Consumer.class); assertThat(inspector.getInputWrapper("foos")).isEqualTo(String.class); @SuppressWarnings("unchecked") - Consumer consumer = (Consumer)context.getBean("foos"); + Consumer consumer = (Consumer) context.getBean("foos"); consumer.accept("hello"); assertThat(ContextFunctionCatalogAutoConfigurationTests.value).isEqualTo("hello"); } @@ -191,12 +214,14 @@ public class ContextFunctionCatalogAutoConfigurationTests { @Test public void compiledFluxConsumer() throws Exception { create(EmptyConfiguration.class, - "spring.cloud.function.compile.foos.lambda=f -> f.subscribe(" + getClass().getName() + "::set)", + "spring.cloud.function.compile.foos.lambda=f -> f.subscribe(" + + getClass().getName() + "::set)", "spring.cloud.function.compile.foos.type=consumer"); assertThat(catalog.lookupConsumer("foos")).isInstanceOf(Consumer.class); assertThat(inspector.getInputWrapper("foos")).isEqualTo(Flux.class); @SuppressWarnings("unchecked") - Consumer> consumer = (Consumer>)context.getBean("foos"); + Consumer> consumer = (Consumer>) context + .getBean("foos"); consumer.accept(Flux.just("hello")); assertThat(ContextFunctionCatalogAutoConfigurationTests.value).isEqualTo("hello"); } @@ -210,7 +235,7 @@ public class ContextFunctionCatalogAutoConfigurationTests { catalog = context.getBean(InMemoryFunctionCatalog.class); inspector = context.getBean(FunctionInspector.class); } - + public static void set(Object value) { ContextFunctionCatalogAutoConfigurationTests.value = value.toString(); } @@ -273,6 +298,25 @@ public class ContextFunctionCatalogAutoConfigurationTests { } } + @EnableAutoConfiguration + @Configuration + protected static class FluxMessageConfiguration { + @Bean + public Function>, Flux>> function() { + return flux -> flux.map(m -> MessageBuilder + .withPayload(m.getPayload().toUpperCase()).build()); + } + } + + @EnableAutoConfiguration + @Configuration + protected static class MessageConfiguration { + @Bean + public Function, Message> function() { + return m -> MessageBuilder.withPayload(m.getPayload().toUpperCase()).build(); + } + } + @EnableAutoConfiguration @Configuration protected static class QualifiedConfiguration { diff --git a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionExtractingFunctionCatalog.java b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionExtractingFunctionCatalog.java index 4314a30ba..8c05912ab 100644 --- a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionExtractingFunctionCatalog.java +++ b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionExtractingFunctionCatalog.java @@ -30,7 +30,8 @@ import org.springframework.cloud.function.context.FunctionInspector; import org.springframework.cloud.function.registry.FunctionCatalog; import org.springframework.util.MethodInvoker; -public class FunctionExtractingFunctionCatalog implements FunctionCatalog, FunctionInspector { +public class FunctionExtractingFunctionCatalog + implements FunctionCatalog, FunctionInspector { private static Log logger = LogFactory .getLog(FunctionExtractingFunctionCatalog.class); @@ -65,6 +66,11 @@ public class FunctionExtractingFunctionCatalog implements FunctionCatalog, Funct return (Supplier) lookup(name, "lookupSupplier"); } + @Override + public boolean isMessage(String name) { + return (Boolean) inspect(name, "isMessage"); + } + @Override public Class getInputType(String name) { return (Class) inspect(name, "getInputType"); @@ -112,14 +118,14 @@ public class FunctionExtractingFunctionCatalog implements FunctionCatalog, Funct } return invoke(FunctionInspector.class, method, arg); } - + private Object lookup(String name, String method) { if (logger.isDebugEnabled()) { logger.debug("Looking up " + name + " with " + method); } return invoke(FunctionCatalog.class, method, name); } - + private Object invoke(Class type, String method, Object arg) { for (String id : deployed) { Object catalog = deployer.getBean(id, type); diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java index 291f25125..5f1eab4eb 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java @@ -93,7 +93,10 @@ public class StreamListeningConsumerInvoker implements SmartInitializingSingleto private Function, Object> convertInput(String name) { Class inputType = functionInspector.getInputType(name); return m -> { - if (inputType.isAssignableFrom(m.getPayload().getClass())) { + if (Message.class.isAssignableFrom(inputType)) { + return m; + } + else if (inputType.isAssignableFrom(m.getPayload().getClass())) { return m.getPayload(); } else { diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java index 417bfe175..eb9853ec1 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java @@ -28,7 +28,7 @@ import org.springframework.cloud.stream.converter.CompositeMessageConverterFacto import org.springframework.cloud.stream.messaging.Processor; import org.springframework.messaging.Message; import org.springframework.messaging.converter.MessageConverter; -import org.springframework.util.Assert; +import org.springframework.messaging.support.MessageBuilder; import reactor.core.publisher.Flux; @@ -96,12 +96,22 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto private Function, Object> convertInput(String name) { Class inputType = functionInspector.getInputType(name); return m -> { - if (inputType.isAssignableFrom(m.getPayload().getClass())) { - return m.getPayload(); + if (functionInspector.isMessage(name)) { + return MessageBuilder.withPayload(convertPayload(name, inputType, m)) + .copyHeaders(m.getHeaders()).build(); } else { - return this.converter.fromMessage(m, inputType); + return convertPayload(name, inputType, m); } }; } + + private Object convertPayload(String name, Class inputType, Message m) { + if (inputType.isAssignableFrom(m.getPayload().getClass())) { + return m.getPayload(); + } + else { + return this.converter.fromMessage(m, inputType); + } + } } diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxMessagePojoStreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxMessagePojoStreamingFunctionTests.java new file mode 100644 index 000000000..7a77c75f2 --- /dev/null +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxMessagePojoStreamingFunctionTests.java @@ -0,0 +1,90 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream.function; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +import reactor.core.publisher.Flux; + +/** + * @author Marius Bogoevici + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = FluxMessagePojoStreamingFunctionTests.StreamingFunctionApplication.class) +public class FluxMessagePojoStreamingFunctionTests { + + @Autowired + Processor processor; + + @Autowired + MessageCollector messageCollector; + + @Test + public void test() throws Exception { + processor.input().send( + MessageBuilder.withPayload(new String("{\"name\":\"foo\"}")).build()); + Message result = messageCollector.forChannel(processor.output()).poll(1000, + TimeUnit.MILLISECONDS); + assertThat(result.getPayload()).isInstanceOf(Foo.class); + } + + @SpringBootApplication + public static class StreamingFunctionApplication { + + @Bean + public Function>, Flux>> uppercase() { + return flux -> flux.map(f -> MessageBuilder + .withPayload(new Foo(f.getPayload().getName().toUpperCase())) + .build()); + } + } + + protected static class Foo { + private String name; + + Foo() { + } + + public Foo(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +} diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/MessagePojoStreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/MessagePojoStreamingFunctionTests.java new file mode 100644 index 000000000..0c58578fc --- /dev/null +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/MessagePojoStreamingFunctionTests.java @@ -0,0 +1,87 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream.function; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Marius Bogoevici + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = MessagePojoStreamingFunctionTests.StreamingFunctionApplication.class) +public class MessagePojoStreamingFunctionTests { + + @Autowired + Processor processor; + + @Autowired + MessageCollector messageCollector; + + @Test + public void test() throws Exception { + processor.input().send( + MessageBuilder.withPayload(new String("{\"name\":\"foo\"}")).build()); + Message result = messageCollector.forChannel(processor.output()).poll(1000, + TimeUnit.MILLISECONDS); + assertThat(result.getPayload()).isInstanceOf(Foo.class); + } + + @SpringBootApplication + public static class StreamingFunctionApplication { + + @Bean + public Function, Message> uppercase() { + return f -> MessageBuilder + .withPayload(new Foo(f.getPayload().getName().toUpperCase())).build(); + } + } + + protected static class Foo { + private String name; + + Foo() { + } + + public Foo(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +}