diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index 2edc0a31f..0e3c81e57 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -457,6 +457,8 @@ public class ContextFunctionCatalogAutoConfiguration { if (target instanceof Supplier) { type = Supplier.class; findType(target, ParamType.OUTPUT); + findType(target, ParamType.OUTPUT_WRAPPER); + isMessage(target); registration.target(target((Supplier) target, key)); for (String name : registration.getNames()) { this.suppliers.put(name, (Supplier) registration.getTarget()); @@ -465,6 +467,8 @@ public class ContextFunctionCatalogAutoConfiguration { else if (target instanceof Consumer) { type = Consumer.class; findType(target, ParamType.INPUT); + findType(target, ParamType.INPUT_WRAPPER); + isMessage(target); // cache wrapper types registration.target(target((Consumer) target, key)); for (String name : registration.getNames()) { this.consumers.put(name, (Consumer) registration.getTarget()); @@ -474,6 +478,8 @@ public class ContextFunctionCatalogAutoConfiguration { type = Function.class; findType(target, ParamType.INPUT); findType(target, ParamType.OUTPUT); + findType(target, ParamType.INPUT_WRAPPER); + findType(target, ParamType.OUTPUT_WRAPPER); isMessage(target); // cache wrapper types registration.target(target((Function) target, key)); for (String name : registration.getNames()) { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java new file mode 100644 index 000000000..375772b47 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java @@ -0,0 +1,105 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.message; + +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.Map; + +import org.springframework.cloud.function.core.FluxWrapper; +import org.springframework.cloud.function.core.Isolated; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.ClassUtils; +import org.springframework.util.ReflectionUtils; + +/** + * @author Dave Syer + * + */ +public abstract class MessageUtils { + + /** + * Create a message for the handler. If the handler is a wrapper for a function in an + * isolated class loader, then the message will be created with the target class + * loader (therefore the {@link Message} class must be on the classpath of the target + * class loader). + * + * @param handler the function that will be applied to the message + * @param payload the payload of the message + * @param headers the headers for the message + * @return a message with the correct class loader + */ + public static Object create(Object handler, Object payload, + Map headers) { + if (handler instanceof FluxWrapper) { + handler = ((FluxWrapper) handler).getTarget(); + } + if (!(handler instanceof Isolated)) { + return MessageBuilder.withPayload(payload).copyHeaders(headers).build(); + } + ClassLoader classLoader = ((Isolated) handler).getClassLoader(); + Class builder = ClassUtils.resolveClassName(MessageBuilder.class.getName(), + classLoader); + Method withPayload = ClassUtils.getMethod(builder, "withPayload", Object.class); + Method copyHeaders = ClassUtils.getMethod(builder, "copyHeaders", Map.class); + Method build = ClassUtils.getMethod(builder, "build"); + Object instance = ReflectionUtils.invokeMethod(withPayload, null, payload); + ReflectionUtils.invokeMethod(copyHeaders, instance, headers); + return ReflectionUtils.invokeMethod(build, instance); + } + + /** + * Convert a message from the handler into one that is safe to consume in the caller's + * class laoder. If the handler is a wrapper for a function in an isolated class + * loader, then the message will be created with the target class loader (therefore + * the {@link Message} class must be on the classpath of the target class loader). + * + * @param handler the function that generated the message + * @param message the message to convert + * @return a message with the correct class loader + */ + public static Message unpack(Object handler, Object message) { + if (handler instanceof FluxWrapper) { + handler = ((FluxWrapper) handler).getTarget(); + } + if (!(handler instanceof Isolated)) { + if (message instanceof Message) { + return (Message) message; + } + return MessageBuilder.withPayload(message).build(); + } + ClassLoader classLoader = ((Isolated) handler).getClassLoader(); + Class type = ClassUtils.resolveClassName(Message.class.getName(), classLoader); + Object payload; + Map headers; + if (type.isAssignableFrom(message.getClass())) { + Method getPayload = ClassUtils.getMethod(type, "getPayload"); + Method getHeaders = ClassUtils.getMethod(type, "getHeaders"); + payload = ReflectionUtils.invokeMethod(getPayload, message); + @SuppressWarnings("unchecked") + Map map = (Map) ReflectionUtils + .invokeMethod(getHeaders, message); + headers = map; + } else { + payload = message; + headers = Collections.emptyMap(); + } + return MessageBuilder.withPayload(payload).copyHeaders(headers).build(); + } + +} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java index 6c47f0fb6..4970e3bbf 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java @@ -28,7 +28,7 @@ import reactor.core.publisher.Flux; * * @param input type of target consumer */ -public class FluxConsumer implements Consumer> { +public class FluxConsumer implements Consumer>, FluxWrapper> { private final Consumer consumer; @@ -36,6 +36,11 @@ public class FluxConsumer implements Consumer> { this.consumer = consumer; } + @Override + public Consumer getTarget() { + return this.consumer; + } + @Override public void accept(Flux input) { input.subscribe(t -> consumer.accept(t)); diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java index fe5c53212..74bba239b 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java @@ -29,13 +29,18 @@ import reactor.core.publisher.Flux; * @param input type of target function * @param output type of target function */ -public class FluxFunction implements Function, Flux> { +public class FluxFunction implements Function, Flux>, FluxWrapper> { private final Function function; public FluxFunction(Function function) { this.function = function; } + + @Override + public Function getTarget() { + return this.function; + } @Override public Flux apply(Flux input) { diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxSupplier.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxSupplier.java index 6a843ae00..f736d0143 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxSupplier.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxSupplier.java @@ -33,7 +33,7 @@ import reactor.core.publisher.Flux; * * @param output type of target supplier */ -public class FluxSupplier implements Supplier> { +public class FluxSupplier implements Supplier>, FluxWrapper> { private final Supplier supplier; @@ -42,12 +42,17 @@ public class FluxSupplier implements Supplier> { public FluxSupplier(Supplier supplier) { this(supplier, null); } - + public FluxSupplier(Supplier supplier, Duration period) { this.supplier = supplier; this.period = period; } + @Override + public Supplier getTarget() { + return this.supplier; + } + @Override @SuppressWarnings({ "unchecked", "rawtypes" }) public Flux get() { diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxWrapper.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxWrapper.java new file mode 100644 index 000000000..9062b56bc --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxWrapper.java @@ -0,0 +1,27 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.core; + +/** + * @author Dave Syer + * + */ +public interface FluxWrapper { + + T getTarget(); + +} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/Isolated.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/Isolated.java new file mode 100644 index 000000000..be93e1a04 --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/Isolated.java @@ -0,0 +1,26 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.core; + +/** + * @author Dave Syer + * + */ +public interface Isolated { + + ClassLoader getClassLoader(); +} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedConsumer.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedConsumer.java index bb43d5cdc..91cb76aea 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedConsumer.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedConsumer.java @@ -24,18 +24,25 @@ import org.springframework.util.ClassUtils; * @author Dave Syer * */ -public class IsolatedConsumer implements Consumer { +public class IsolatedConsumer implements Consumer, Isolated { private final Consumer consumer; + private final ClassLoader classLoader; public IsolatedConsumer(Consumer consumer) { this.consumer = consumer; + this.classLoader = consumer.getClass().getClassLoader(); + } + + @Override + public ClassLoader getClassLoader() { + return this.classLoader; } @Override public void accept(T item) { ClassLoader context = ClassUtils - .overrideThreadContextClassLoader(consumer.getClass().getClassLoader()); + .overrideThreadContextClassLoader(this.classLoader); try { consumer.accept(item); } diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedFunction.java index a5665ca9d..d257d1af3 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedFunction.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedFunction.java @@ -24,20 +24,27 @@ import org.springframework.util.ClassUtils; * @author Dave Syer * */ -public class IsolatedFunction implements Function { +public class IsolatedFunction implements Function, Isolated { private final Function function; + private final ClassLoader classLoader; public IsolatedFunction(Function function) { this.function = function; + this.classLoader = function.getClass().getClassLoader(); + } + + @Override + public ClassLoader getClassLoader() { + return this.classLoader; } @Override public T apply(S item) { ClassLoader context = ClassUtils - .overrideThreadContextClassLoader(function.getClass().getClassLoader()); + .overrideThreadContextClassLoader(this.classLoader); try { - return function.apply(item); + return this.function.apply(item); } finally { ClassUtils.overrideThreadContextClassLoader(context); diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedSupplier.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedSupplier.java index f42eb06c4..57cacab58 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedSupplier.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedSupplier.java @@ -24,18 +24,25 @@ import org.springframework.util.ClassUtils; * @author Dave Syer * */ -public class IsolatedSupplier implements Supplier { +public class IsolatedSupplier implements Supplier, Isolated { private final Supplier supplier; + private final ClassLoader classLoader; public IsolatedSupplier(Supplier supplier) { this.supplier = supplier; + this.classLoader = supplier.getClass().getClassLoader(); + } + + @Override + public ClassLoader getClassLoader() { + return this.classLoader; } @Override public T get() { ClassLoader context = ClassUtils - .overrideThreadContextClassLoader(supplier.getClass().getClassLoader()); + .overrideThreadContextClassLoader(this.classLoader); try { return supplier.get(); } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java index 65767b954..09e88ff2a 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java @@ -27,6 +27,7 @@ import java.util.function.Function; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.cloud.function.context.catalog.FunctionInspector; +import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.cloud.function.core.FunctionCatalog; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; @@ -91,6 +92,9 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto return flux.publish(values -> { Flux result = function .apply(values.map(message -> convertInput(function).apply(message))); + if (this.functionInspector.isMessage(function)) { + result = result.map(message -> MessageUtils.unpack(function, message)); + } Flux> aggregate = headers(values); return result.withLatestFrom(aggregate, (p, m) -> message(p, m)); }); @@ -141,7 +145,7 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto .get(StreamConfigurationProperties.ROUTE_KEY); name = stash(key); } - if (name==null && defaultRoute != null) { + if (name == null && defaultRoute != null) { name = stash(defaultRoute); } if (name == null) { @@ -155,10 +159,10 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto else { for (String candidate : names) { Object function = functionCatalog.lookupFunction(candidate); - if (function==null) { + if (function == null) { function = functionCatalog.lookupConsumer(candidate); } - if (function==null) { + if (function == null) { continue; } Class inputType = functionInspector.getInputType(function); @@ -202,8 +206,8 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto Class inputType = functionInspector.getInputType(function); return m -> { if (functionInspector.isMessage(function)) { - return MessageBuilder.withPayload(convertPayload(inputType, m)) - .copyHeaders(m.getHeaders()).build(); + return MessageUtils.create(function, convertPayload(inputType, m), + m.getHeaders()); } else { return convertPayload(inputType, m); diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/SupplierInvokingMessageProducer.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/SupplierInvokingMessageProducer.java index 8410d095a..ff581fba3 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/SupplierInvokingMessageProducer.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/SupplierInvokingMessageProducer.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Supplier; +import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.cloud.function.core.FunctionCatalog; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.endpoint.MessageProducerSupport; @@ -86,7 +87,8 @@ public class SupplierInvokingMessageProducer extends MessageProducerSupport { if (supplier != null) { suppliers.add(name); disposables.put(name, - supplier.get().subscribeOn(Schedulers.elastic()).subscribe(m -> send(name, m))); + supplier.get().subscribeOn(Schedulers.elastic()) + .subscribe(m -> send(name, m))); } } } @@ -94,16 +96,10 @@ public class SupplierInvokingMessageProducer extends MessageProducerSupport { } private void send(String name, Object payload) { - Message message; - if (payload instanceof Message) { - message = MessageBuilder.fromMessage((Message) payload) - .setHeaderIfAbsent(StreamConfigurationProperties.ROUTE_KEY, name) - .build(); - } - else { - message = MessageBuilder.withPayload(payload) - .setHeader(StreamConfigurationProperties.ROUTE_KEY, name).build(); - } + Supplier> supplier = functionCatalog.lookupSupplier(name); + Message message = MessageUtils.unpack(supplier, payload); + message = MessageBuilder.fromMessage(message) + .setHeaderIfAbsent(StreamConfigurationProperties.ROUTE_KEY, name).build(); getOutputChannel().send(message); } diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/ClassLoaderUtils.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/ClassLoaderUtils.java new file mode 100644 index 000000000..5bd4a2826 --- /dev/null +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/ClassLoaderUtils.java @@ -0,0 +1,154 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream.function; + +import java.io.File; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.List; +import java.util.jar.JarFile; + +import org.springframework.util.StringUtils; + +/** + * @author Dave Syer + * + */ +public class ClassLoaderUtils { + + public static ClassLoader createClassLoader() { + URL[] urls = findClassPath(); + if (urls.length == 1) { + URL[] classpath = extractClasspath(urls[0]); + if (classpath != null) { + urls = classpath; + } + } + List child = new ArrayList<>(); + for (URL url : urls) { + child.add(url); + } + for (URL url : urls) { + if (isRoot(StringUtils.getFilename(clean(url.toString())))) { + child.remove(url); + } + } + ClassLoader base = ClassLoaderUtils.class.getClassLoader(); + return new ParentLastURLClassLoader(child.toArray(new URL[0]), base); + } + + private static URL[] extractClasspath(URL url) { + // This works for a jar indirection like in surefire and IntelliJ + if (url.toString().endsWith(".jar")) { + JarFile jar; + try { + jar = new JarFile(new File(url.toURI())); + String path = jar.getManifest().getMainAttributes() + .getValue("Class-Path"); + if (path != null) { + List result = new ArrayList<>(); + for (String element : path.split(" ")) { + result.add(new URL(element)); + } + return result.toArray(new URL[0]); + } + } + catch (Exception e) { + } + } + return null; + } + + private static String clean(String jar) { + // This works with fat jars like Spring Boot where the path elements look like + // jar:file:...something.jar!/. + return jar.endsWith("!/") ? jar.substring(0, jar.length() - 2) : jar; + } + + private static URL[] findClassPath() { + return ((URLClassLoader) ClassLoaderUtils.class.getClassLoader()).getURLs(); + } + + private static boolean isRoot(String file) { + return file.startsWith("reactor-core") || file.startsWith("reactive-streams"); + } + + private static class ParentLastURLClassLoader extends ClassLoader { + private ChildURLClassLoader childClassLoader; + + /** + * This class allows me to call findClass on a classloader + */ + private static class FindClassClassLoader extends ClassLoader { + public FindClassClassLoader(ClassLoader parent) { + super(parent); + } + + @Override + public Class findClass(String name) throws ClassNotFoundException { + return super.findClass(name); + } + } + + /** + * This class delegates (child then parent) for the findClass method for a + * URLClassLoader. We need this because findClass is protected in URLClassLoader + */ + private static class ChildURLClassLoader extends URLClassLoader { + private FindClassClassLoader realParent; + + public ChildURLClassLoader(URL[] urls, FindClassClassLoader realParent) { + super(urls, null); + + this.realParent = realParent; + } + + @Override + public Class findClass(String name) throws ClassNotFoundException { + try { + // first try to use the URLClassLoader findClass + return super.findClass(name); + } + catch (ClassNotFoundException e) { + // if that fails, we ask our real parent classloader to load the class + // (we give up) + return realParent.loadClass(name); + } + } + } + + public ParentLastURLClassLoader(URL[] urls, ClassLoader parent) { + super(parent); + childClassLoader = new ChildURLClassLoader(urls, + new FindClassClassLoader(this.getParent())); + } + + @Override + protected synchronized Class loadClass(String name, boolean resolve) + throws ClassNotFoundException { + try { + // first we try to find a class inside the child classloader + return childClassLoader.findClass(name); + } + catch (ClassNotFoundException e) { + // didn't find it, try the parent + return super.loadClass(name, resolve); + } + } + } +} diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/IsolatedFluxMessagePojoStreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/IsolatedFluxMessagePojoStreamingFunctionTests.java new file mode 100644 index 000000000..cc1dbcf90 --- /dev/null +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/IsolatedFluxMessagePojoStreamingFunctionTests.java @@ -0,0 +1,115 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream.function; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import javax.annotation.PostConstruct; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.function.context.FunctionRegistration; +import org.springframework.cloud.function.context.FunctionRegistry; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.ClassUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +import reactor.core.publisher.Flux; + +/** + * @author Dave Syer + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = IsolatedFluxMessagePojoStreamingFunctionTests.StreamingFunctionApplication.class) +public class IsolatedFluxMessagePojoStreamingFunctionTests { + + @Autowired + Processor processor; + + @Autowired + MessageCollector messageCollector; + + @Test + public void test() throws Exception { + processor.input().send( + MessageBuilder.withPayload(new String("{\"name\":\"foo\"}")).build()); + Message result = messageCollector.forChannel(processor.output()).poll(1000, + TimeUnit.MILLISECONDS); + assertThat(result).isInstanceOf(Message.class); + } + + @SpringBootApplication + public static class StreamingFunctionApplication { + + @Autowired + private FunctionRegistry registry; + + @PostConstruct + public void register() { + // TODO: this class loader doesn't really test the isolation properly. Not + // sure why, but if you remove the reflection in MessageUtils the test is + // still green. + ClassLoader loader = ClassLoaderUtils.createClassLoader(); + Class type = ClassUtils.resolveClassName(Uppercase.class.getName(), + loader); + registry.register( + new FunctionRegistration(BeanUtils.instantiate(type)) + .name("uppercase")); + } + + } + + public static class Uppercase + implements Function>, Flux>> { + @Override + public Flux> apply(Flux> flux) { + return flux.map(message -> MessageBuilder + .withPayload(new Foo(message.getPayload().getName().toUpperCase())) + .build()); + } + } + + protected static class Foo { + private String name; + + Foo() { + } + + public Foo(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +} diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/IsolatedMessagePojoStreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/IsolatedMessagePojoStreamingFunctionTests.java new file mode 100644 index 000000000..b5b08d5b2 --- /dev/null +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/IsolatedMessagePojoStreamingFunctionTests.java @@ -0,0 +1,110 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream.function; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import javax.annotation.PostConstruct; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.function.context.FunctionRegistration; +import org.springframework.cloud.function.context.FunctionRegistry; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.ClassUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Dave Syer + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = IsolatedMessagePojoStreamingFunctionTests.StreamingFunctionApplication.class) +public class IsolatedMessagePojoStreamingFunctionTests { + + @Autowired + Processor processor; + + @Autowired + MessageCollector messageCollector; + + @Test + public void test() throws Exception { + processor.input().send( + MessageBuilder.withPayload(new String("{\"name\":\"foo\"}")).build()); + Message result = messageCollector.forChannel(processor.output()).poll(1000, + TimeUnit.MILLISECONDS); + assertThat(result.getPayload().getClass().getName()) + .isEqualTo(Foo.class.getName()); + } + + @SpringBootApplication + public static class StreamingFunctionApplication { + + @Autowired + private FunctionRegistry registry; + + @PostConstruct + public void register() { + ClassLoader loader = ClassLoaderUtils.createClassLoader(); + Class type = ClassUtils.resolveClassName(Uppercase.class.getName(), + loader); + registry.register( + new FunctionRegistration(BeanUtils.instantiate(type)) + .name("uppercase")); + } + + } + + public static class Uppercase implements Function, Message> { + @Override + public Message apply(Message flux) { + return MessageBuilder + .withPayload(new Foo(flux.getPayload().getName().toUpperCase())) + .build(); + } + } + + protected static class Foo { + private String name; + + Foo() { + } + + public Foo(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java index 384a9ca0d..e6f67912a 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.cloud.function.context.catalog.FunctionInspector; +import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.cloud.function.web.flux.request.FluxRequest; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; @@ -72,6 +73,9 @@ public class FunctionController { flux = flux.log(); } Flux result = Flux.from(function.apply(flux)); + if (inspector.isMessage(function)) { + result = result.map(message -> MessageUtils.unpack(function, message)); + } if (logger.isDebugEnabled()) { logger.debug("Handled POST with function"); } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxHandlerMethodArgumentResolver.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxHandlerMethodArgumentResolver.java index c7e60e485..dbe459f3b 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxHandlerMethodArgumentResolver.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxHandlerMethodArgumentResolver.java @@ -30,13 +30,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.cloud.function.context.catalog.FunctionInspector; +import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.cloud.function.web.flux.constants.WebRequestConstants; import org.springframework.cloud.function.web.util.HeaderUtils; import org.springframework.core.MethodParameter; import org.springframework.core.Ordered; import org.springframework.http.MediaType; import org.springframework.http.server.ServletServerHttpRequest; -import org.springframework.messaging.support.MessageBuilder; +import org.springframework.messaging.MessageHeaders; import org.springframework.util.StreamUtils; import org.springframework.web.bind.support.WebDataBinderFactory; import org.springframework.web.context.request.NativeWebRequest; @@ -106,12 +107,10 @@ public class FluxHandlerMethodArgumentResolver } if (message) { List messages = new ArrayList<>(); + MessageHeaders headers = HeaderUtils.fromHttp(new ServletServerHttpRequest( + webRequest.getNativeRequest(HttpServletRequest.class)).getHeaders()); for (Object payload : body) { - messages.add(MessageBuilder.withPayload(payload) - .copyHeaders(HeaderUtils.fromHttp(new ServletServerHttpRequest( - webRequest.getNativeRequest(HttpServletRequest.class)) - .getHeaders())) - .build()); + messages.add(MessageUtils.create(handler, payload, headers)); } body = messages; }