diff --git a/spring-cloud-function-context/pom.xml b/spring-cloud-function-context/pom.xml
index fce40e0f3..7ceaad390 100644
--- a/spring-cloud-function-context/pom.xml
+++ b/spring-cloud-function-context/pom.xml
@@ -28,6 +28,10 @@
org.springframework
spring-web
+
+ org.springframework
+ spring-messaging
+
com.fasterxml.jackson.core
jackson-databind
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java
index b72270a87..dd6691b42 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java
@@ -62,6 +62,7 @@ import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.type.StandardMethodMetadata;
import org.springframework.core.type.classreading.MethodMetadataReadingVisitor;
+import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
@@ -108,6 +109,11 @@ public class ContextFunctionCatalogAutoConfiguration {
this.processor = processor;
}
+ @Override
+ public boolean isMessage(String name) {
+ return processor.isMessage(name);
+ }
+
@Override
public Class> getInputWrapper(String name) {
return processor.findInputWrapper(name);
@@ -472,14 +478,19 @@ public class ContextFunctionCatalogAutoConfiguration {
Type typeArgumentAtIndex = parameterizedType
.getActualTypeArguments()[index];
if (typeArgumentAtIndex instanceof ParameterizedType
- && FunctionInspector.isWrapper(
- ((ParameterizedType) typeArgumentAtIndex).getRawType())
&& !paramType.isWrapper()) {
- param = ((ParameterizedType) typeArgumentAtIndex)
- .getActualTypeArguments()[0];
+ if (FunctionInspector.isWrapper(
+ ((ParameterizedType) typeArgumentAtIndex).getRawType())) {
+ param = ((ParameterizedType) typeArgumentAtIndex)
+ .getActualTypeArguments()[0];
+ param = extractNestedType(paramType, param);
+ }
+ else {
+ param = extractNestedType(paramType, typeArgumentAtIndex);
+ }
}
else {
- param = typeArgumentAtIndex;
+ param = extractNestedType(paramType, typeArgumentAtIndex);
}
}
else {
@@ -488,6 +499,16 @@ public class ContextFunctionCatalogAutoConfiguration {
return param;
}
+ private Type extractNestedType(ParamType paramType, Type param) {
+ if (!paramType.isInnerWrapper()
+ && param.getTypeName().startsWith(Message.class.getName())) {
+ if (param instanceof ParameterizedType) {
+ param = ((ParameterizedType) param).getActualTypeArguments()[0];
+ }
+ }
+ return param;
+ }
+
private Object getField(Object target, String name) {
Field field = ReflectionUtils.findField(target.getClass(), name);
if (field == null) {
@@ -497,6 +518,19 @@ public class ContextFunctionCatalogAutoConfiguration {
return ReflectionUtils.getField(field, target);
}
+ private boolean isMessage(String name) {
+ if (name == null || !registry.containsBeanDefinition(name)) {
+ return false;
+ }
+ return Message.class.isAssignableFrom(findType(name,
+ (AbstractBeanDefinition) registry.getBeanDefinition(name),
+ ParamType.INPUT_INNER_WRAPPER))
+ || Message.class.isAssignableFrom(findType(name,
+ (AbstractBeanDefinition) registry.getBeanDefinition(name),
+ ParamType.OUTPUT_INNER_WRAPPER));
+
+ }
+
private Class> findInputWrapper(String name) {
if (name == null || !registry.containsBeanDefinition(name)) {
return Object.class;
@@ -533,19 +567,25 @@ public class ContextFunctionCatalogAutoConfiguration {
}
static enum ParamType {
- INPUT, OUTPUT, INPUT_WRAPPER, OUTPUT_WRAPPER;
+ INPUT, OUTPUT, INPUT_WRAPPER, OUTPUT_WRAPPER, INPUT_INNER_WRAPPER, OUTPUT_INNER_WRAPPER;
public boolean isOutput() {
- return this == OUTPUT || this == OUTPUT_WRAPPER;
+ return this == OUTPUT || this == OUTPUT_WRAPPER
+ || this == OUTPUT_INNER_WRAPPER;
}
public boolean isInput() {
- return this == INPUT || this == INPUT_WRAPPER;
+ return this == INPUT || this == INPUT_WRAPPER
+ || this == INPUT_INNER_WRAPPER;
}
public boolean isWrapper() {
return this == OUTPUT_WRAPPER || this == INPUT_WRAPPER;
}
+
+ public boolean isInnerWrapper() {
+ return this == OUTPUT_INNER_WRAPPER || this == INPUT_INNER_WRAPPER;
+ }
}
}
}
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionInspector.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionInspector.java
index e6bb10a42..998a12e05 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionInspector.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionInspector.java
@@ -28,6 +28,8 @@ import reactor.core.publisher.Mono;
*/
public interface FunctionInspector {
+ boolean isMessage(String name);
+
Class> getInputType(String name);
Class> getOutputType(String name);
diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfigurationTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfigurationTests.java
index 12f0d1947..69ecca78b 100644
--- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfigurationTests.java
+++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfigurationTests.java
@@ -39,6 +39,8 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.io.FileSystemResource;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StreamUtils;
import static org.assertj.core.api.Assertions.assertThat;
@@ -80,6 +82,26 @@ public class ContextFunctionCatalogAutoConfigurationTests {
assertThat(inspector.getInputWrapper("function")).isAssignableFrom(Map.class);
}
+ @Test
+ public void fluxMessageFunction() {
+ create(FluxMessageConfiguration.class);
+ assertThat(context.getBean("function")).isInstanceOf(Function.class);
+ assertThat(catalog.lookupFunction("function")).isInstanceOf(Function.class);
+ assertThat(inspector.isMessage("function")).isTrue();
+ assertThat(inspector.getInputType("function")).isAssignableFrom(String.class);
+ assertThat(inspector.getInputWrapper("function")).isAssignableFrom(Flux.class);
+ }
+
+ @Test
+ public void messageFunction() {
+ create(MessageConfiguration.class);
+ assertThat(context.getBean("function")).isInstanceOf(Function.class);
+ assertThat(catalog.lookupFunction("function")).isInstanceOf(Function.class);
+ assertThat(inspector.isMessage("function")).isTrue();
+ assertThat(inspector.getInputType("function")).isAssignableFrom(String.class);
+ assertThat(inspector.getInputWrapper("function")).isAssignableFrom(String.class);
+ }
+
@Test
public void genericFluxFunction() {
create(GenericFluxConfiguration.class);
@@ -177,13 +199,14 @@ public class ContextFunctionCatalogAutoConfigurationTests {
@Test
public void compiledConsumer() throws Exception {
create(EmptyConfiguration.class,
- "spring.cloud.function.compile.foos.lambda=" + getClass().getName() + "::set",
+ "spring.cloud.function.compile.foos.lambda=" + getClass().getName()
+ + "::set",
"spring.cloud.function.compile.foos.type=consumer",
"spring.cloud.function.compile.foos.inputType=String");
assertThat(catalog.lookupConsumer("foos")).isInstanceOf(Consumer.class);
assertThat(inspector.getInputWrapper("foos")).isEqualTo(String.class);
@SuppressWarnings("unchecked")
- Consumer consumer = (Consumer)context.getBean("foos");
+ Consumer consumer = (Consumer) context.getBean("foos");
consumer.accept("hello");
assertThat(ContextFunctionCatalogAutoConfigurationTests.value).isEqualTo("hello");
}
@@ -191,12 +214,14 @@ public class ContextFunctionCatalogAutoConfigurationTests {
@Test
public void compiledFluxConsumer() throws Exception {
create(EmptyConfiguration.class,
- "spring.cloud.function.compile.foos.lambda=f -> f.subscribe(" + getClass().getName() + "::set)",
+ "spring.cloud.function.compile.foos.lambda=f -> f.subscribe("
+ + getClass().getName() + "::set)",
"spring.cloud.function.compile.foos.type=consumer");
assertThat(catalog.lookupConsumer("foos")).isInstanceOf(Consumer.class);
assertThat(inspector.getInputWrapper("foos")).isEqualTo(Flux.class);
@SuppressWarnings("unchecked")
- Consumer> consumer = (Consumer>)context.getBean("foos");
+ Consumer> consumer = (Consumer>) context
+ .getBean("foos");
consumer.accept(Flux.just("hello"));
assertThat(ContextFunctionCatalogAutoConfigurationTests.value).isEqualTo("hello");
}
@@ -210,7 +235,7 @@ public class ContextFunctionCatalogAutoConfigurationTests {
catalog = context.getBean(InMemoryFunctionCatalog.class);
inspector = context.getBean(FunctionInspector.class);
}
-
+
public static void set(Object value) {
ContextFunctionCatalogAutoConfigurationTests.value = value.toString();
}
@@ -273,6 +298,25 @@ public class ContextFunctionCatalogAutoConfigurationTests {
}
}
+ @EnableAutoConfiguration
+ @Configuration
+ protected static class FluxMessageConfiguration {
+ @Bean
+ public Function>, Flux>> function() {
+ return flux -> flux.map(m -> MessageBuilder
+ .withPayload(m.getPayload().toUpperCase()).build());
+ }
+ }
+
+ @EnableAutoConfiguration
+ @Configuration
+ protected static class MessageConfiguration {
+ @Bean
+ public Function, Message> function() {
+ return m -> MessageBuilder.withPayload(m.getPayload().toUpperCase()).build();
+ }
+ }
+
@EnableAutoConfiguration
@Configuration
protected static class QualifiedConfiguration {
diff --git a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionExtractingFunctionCatalog.java b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionExtractingFunctionCatalog.java
index 4314a30ba..8c05912ab 100644
--- a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionExtractingFunctionCatalog.java
+++ b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionExtractingFunctionCatalog.java
@@ -30,7 +30,8 @@ import org.springframework.cloud.function.context.FunctionInspector;
import org.springframework.cloud.function.registry.FunctionCatalog;
import org.springframework.util.MethodInvoker;
-public class FunctionExtractingFunctionCatalog implements FunctionCatalog, FunctionInspector {
+public class FunctionExtractingFunctionCatalog
+ implements FunctionCatalog, FunctionInspector {
private static Log logger = LogFactory
.getLog(FunctionExtractingFunctionCatalog.class);
@@ -65,6 +66,11 @@ public class FunctionExtractingFunctionCatalog implements FunctionCatalog, Funct
return (Supplier) lookup(name, "lookupSupplier");
}
+ @Override
+ public boolean isMessage(String name) {
+ return (Boolean) inspect(name, "isMessage");
+ }
+
@Override
public Class> getInputType(String name) {
return (Class>) inspect(name, "getInputType");
@@ -112,14 +118,14 @@ public class FunctionExtractingFunctionCatalog implements FunctionCatalog, Funct
}
return invoke(FunctionInspector.class, method, arg);
}
-
+
private Object lookup(String name, String method) {
if (logger.isDebugEnabled()) {
logger.debug("Looking up " + name + " with " + method);
}
return invoke(FunctionCatalog.class, method, name);
}
-
+
private Object invoke(Class> type, String method, Object arg) {
for (String id : deployed) {
Object catalog = deployer.getBean(id, type);
diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java
index 291f25125..5f1eab4eb 100644
--- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java
+++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java
@@ -93,7 +93,10 @@ public class StreamListeningConsumerInvoker implements SmartInitializingSingleto
private Function, Object> convertInput(String name) {
Class> inputType = functionInspector.getInputType(name);
return m -> {
- if (inputType.isAssignableFrom(m.getPayload().getClass())) {
+ if (Message.class.isAssignableFrom(inputType)) {
+ return m;
+ }
+ else if (inputType.isAssignableFrom(m.getPayload().getClass())) {
return m.getPayload();
}
else {
diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java
index 417bfe175..eb9853ec1 100644
--- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java
+++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java
@@ -28,7 +28,7 @@ import org.springframework.cloud.stream.converter.CompositeMessageConverterFacto
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
-import org.springframework.util.Assert;
+import org.springframework.messaging.support.MessageBuilder;
import reactor.core.publisher.Flux;
@@ -96,12 +96,22 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
private Function, Object> convertInput(String name) {
Class> inputType = functionInspector.getInputType(name);
return m -> {
- if (inputType.isAssignableFrom(m.getPayload().getClass())) {
- return m.getPayload();
+ if (functionInspector.isMessage(name)) {
+ return MessageBuilder.withPayload(convertPayload(name, inputType, m))
+ .copyHeaders(m.getHeaders()).build();
}
else {
- return this.converter.fromMessage(m, inputType);
+ return convertPayload(name, inputType, m);
}
};
}
+
+ private Object convertPayload(String name, Class> inputType, Message> m) {
+ if (inputType.isAssignableFrom(m.getPayload().getClass())) {
+ return m.getPayload();
+ }
+ else {
+ return this.converter.fromMessage(m, inputType);
+ }
+ }
}
diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxMessagePojoStreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxMessagePojoStreamingFunctionTests.java
new file mode 100644
index 000000000..7a77c75f2
--- /dev/null
+++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxMessagePojoStreamingFunctionTests.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.stream.function;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.cloud.stream.messaging.Processor;
+import org.springframework.cloud.stream.test.binder.MessageCollector;
+import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import reactor.core.publisher.Flux;
+
+/**
+ * @author Marius Bogoevici
+ */
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = FluxMessagePojoStreamingFunctionTests.StreamingFunctionApplication.class)
+public class FluxMessagePojoStreamingFunctionTests {
+
+ @Autowired
+ Processor processor;
+
+ @Autowired
+ MessageCollector messageCollector;
+
+ @Test
+ public void test() throws Exception {
+ processor.input().send(
+ MessageBuilder.withPayload(new String("{\"name\":\"foo\"}")).build());
+ Message> result = messageCollector.forChannel(processor.output()).poll(1000,
+ TimeUnit.MILLISECONDS);
+ assertThat(result.getPayload()).isInstanceOf(Foo.class);
+ }
+
+ @SpringBootApplication
+ public static class StreamingFunctionApplication {
+
+ @Bean
+ public Function>, Flux>> uppercase() {
+ return flux -> flux.map(f -> MessageBuilder
+ .withPayload(new Foo(f.getPayload().getName().toUpperCase()))
+ .build());
+ }
+ }
+
+ protected static class Foo {
+ private String name;
+
+ Foo() {
+ }
+
+ public Foo(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+ }
+}
diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/MessagePojoStreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/MessagePojoStreamingFunctionTests.java
new file mode 100644
index 000000000..0c58578fc
--- /dev/null
+++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/MessagePojoStreamingFunctionTests.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.stream.function;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.cloud.stream.messaging.Processor;
+import org.springframework.cloud.stream.test.binder.MessageCollector;
+import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * @author Marius Bogoevici
+ */
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = MessagePojoStreamingFunctionTests.StreamingFunctionApplication.class)
+public class MessagePojoStreamingFunctionTests {
+
+ @Autowired
+ Processor processor;
+
+ @Autowired
+ MessageCollector messageCollector;
+
+ @Test
+ public void test() throws Exception {
+ processor.input().send(
+ MessageBuilder.withPayload(new String("{\"name\":\"foo\"}")).build());
+ Message> result = messageCollector.forChannel(processor.output()).poll(1000,
+ TimeUnit.MILLISECONDS);
+ assertThat(result.getPayload()).isInstanceOf(Foo.class);
+ }
+
+ @SpringBootApplication
+ public static class StreamingFunctionApplication {
+
+ @Bean
+ public Function, Message> uppercase() {
+ return f -> MessageBuilder
+ .withPayload(new Foo(f.getPayload().getName().toUpperCase())).build();
+ }
+ }
+
+ protected static class Foo {
+ private String name;
+
+ Foo() {
+ }
+
+ public Foo(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+ }
+}