From 1b624c3531703994f9457d986ec6dbf1fd8a652c Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Fri, 16 Feb 2018 08:16:55 +0000 Subject: [PATCH] Support for isolated class loaders extended to cover more functions Functions with Flux and Message (as well as POJOs and Flux of POJO which were already supported) should now work if they are created in an isolated class loader. Preconditions: * The class loaders must have the reactor-core (and reactive-streams) shared between the app and the function. Practically speaking this means there has to be a parent class loader with just reactive types, and sibling children for the app and the function. This is not a new requirement (it was needed for Flux of POJO anyway). * Message types are handled reflectively, so they don't have to be in a shared class loader. But they do have to be on the class path on both sides (obviously). --- ...ntextFunctionCatalogAutoConfiguration.java | 6 + .../context/message/MessageUtils.java | 105 ++++++++++++ .../cloud/function/core/FluxConsumer.java | 7 +- .../cloud/function/core/FluxFunction.java | 7 +- .../cloud/function/core/FluxSupplier.java | 9 +- .../cloud/function/core/FluxWrapper.java | 27 +++ .../cloud/function/core/Isolated.java | 26 +++ .../cloud/function/core/IsolatedConsumer.java | 11 +- .../cloud/function/core/IsolatedFunction.java | 13 +- .../cloud/function/core/IsolatedSupplier.java | 11 +- .../StreamListeningFunctionInvoker.java | 14 +- .../SupplierInvokingMessageProducer.java | 18 +- .../stream/function/ClassLoaderUtils.java | 154 ++++++++++++++++++ ...FluxMessagePojoStreamingFunctionTests.java | 115 +++++++++++++ ...atedMessagePojoStreamingFunctionTests.java | 110 +++++++++++++ .../function/web/flux/FunctionController.java | 4 + .../FluxHandlerMethodArgumentResolver.java | 11 +- 17 files changed, 615 insertions(+), 33 deletions(-) create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java create mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxWrapper.java create mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/Isolated.java create mode 100644 spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/ClassLoaderUtils.java create mode 100644 spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/IsolatedFluxMessagePojoStreamingFunctionTests.java create mode 100644 spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/IsolatedMessagePojoStreamingFunctionTests.java diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index 2edc0a31f..0e3c81e57 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -457,6 +457,8 @@ public class ContextFunctionCatalogAutoConfiguration { if (target instanceof Supplier) { type = Supplier.class; findType(target, ParamType.OUTPUT); + findType(target, ParamType.OUTPUT_WRAPPER); + isMessage(target); registration.target(target((Supplier) target, key)); for (String name : registration.getNames()) { this.suppliers.put(name, (Supplier) registration.getTarget()); @@ -465,6 +467,8 @@ public class ContextFunctionCatalogAutoConfiguration { else if (target instanceof Consumer) { type = Consumer.class; findType(target, ParamType.INPUT); + findType(target, ParamType.INPUT_WRAPPER); + isMessage(target); // cache wrapper types registration.target(target((Consumer) target, key)); for (String name : registration.getNames()) { this.consumers.put(name, (Consumer) registration.getTarget()); @@ -474,6 +478,8 @@ public class ContextFunctionCatalogAutoConfiguration { type = Function.class; findType(target, ParamType.INPUT); findType(target, ParamType.OUTPUT); + findType(target, ParamType.INPUT_WRAPPER); + findType(target, ParamType.OUTPUT_WRAPPER); isMessage(target); // cache wrapper types registration.target(target((Function) target, key)); for (String name : registration.getNames()) { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java new file mode 100644 index 000000000..375772b47 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java @@ -0,0 +1,105 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.message; + +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.Map; + +import org.springframework.cloud.function.core.FluxWrapper; +import org.springframework.cloud.function.core.Isolated; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.ClassUtils; +import org.springframework.util.ReflectionUtils; + +/** + * @author Dave Syer + * + */ +public abstract class MessageUtils { + + /** + * Create a message for the handler. If the handler is a wrapper for a function in an + * isolated class loader, then the message will be created with the target class + * loader (therefore the {@link Message} class must be on the classpath of the target + * class loader). + * + * @param handler the function that will be applied to the message + * @param payload the payload of the message + * @param headers the headers for the message + * @return a message with the correct class loader + */ + public static Object create(Object handler, Object payload, + Map headers) { + if (handler instanceof FluxWrapper) { + handler = ((FluxWrapper) handler).getTarget(); + } + if (!(handler instanceof Isolated)) { + return MessageBuilder.withPayload(payload).copyHeaders(headers).build(); + } + ClassLoader classLoader = ((Isolated) handler).getClassLoader(); + Class builder = ClassUtils.resolveClassName(MessageBuilder.class.getName(), + classLoader); + Method withPayload = ClassUtils.getMethod(builder, "withPayload", Object.class); + Method copyHeaders = ClassUtils.getMethod(builder, "copyHeaders", Map.class); + Method build = ClassUtils.getMethod(builder, "build"); + Object instance = ReflectionUtils.invokeMethod(withPayload, null, payload); + ReflectionUtils.invokeMethod(copyHeaders, instance, headers); + return ReflectionUtils.invokeMethod(build, instance); + } + + /** + * Convert a message from the handler into one that is safe to consume in the caller's + * class laoder. If the handler is a wrapper for a function in an isolated class + * loader, then the message will be created with the target class loader (therefore + * the {@link Message} class must be on the classpath of the target class loader). + * + * @param handler the function that generated the message + * @param message the message to convert + * @return a message with the correct class loader + */ + public static Message unpack(Object handler, Object message) { + if (handler instanceof FluxWrapper) { + handler = ((FluxWrapper) handler).getTarget(); + } + if (!(handler instanceof Isolated)) { + if (message instanceof Message) { + return (Message) message; + } + return MessageBuilder.withPayload(message).build(); + } + ClassLoader classLoader = ((Isolated) handler).getClassLoader(); + Class type = ClassUtils.resolveClassName(Message.class.getName(), classLoader); + Object payload; + Map headers; + if (type.isAssignableFrom(message.getClass())) { + Method getPayload = ClassUtils.getMethod(type, "getPayload"); + Method getHeaders = ClassUtils.getMethod(type, "getHeaders"); + payload = ReflectionUtils.invokeMethod(getPayload, message); + @SuppressWarnings("unchecked") + Map map = (Map) ReflectionUtils + .invokeMethod(getHeaders, message); + headers = map; + } else { + payload = message; + headers = Collections.emptyMap(); + } + return MessageBuilder.withPayload(payload).copyHeaders(headers).build(); + } + +} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java index 6c47f0fb6..4970e3bbf 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java @@ -28,7 +28,7 @@ import reactor.core.publisher.Flux; * * @param input type of target consumer */ -public class FluxConsumer implements Consumer> { +public class FluxConsumer implements Consumer>, FluxWrapper> { private final Consumer consumer; @@ -36,6 +36,11 @@ public class FluxConsumer implements Consumer> { this.consumer = consumer; } + @Override + public Consumer getTarget() { + return this.consumer; + } + @Override public void accept(Flux input) { input.subscribe(t -> consumer.accept(t)); diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java index fe5c53212..74bba239b 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java @@ -29,13 +29,18 @@ import reactor.core.publisher.Flux; * @param input type of target function * @param output type of target function */ -public class FluxFunction implements Function, Flux> { +public class FluxFunction implements Function, Flux>, FluxWrapper> { private final Function function; public FluxFunction(Function function) { this.function = function; } + + @Override + public Function getTarget() { + return this.function; + } @Override public Flux apply(Flux input) { diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxSupplier.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxSupplier.java index 6a843ae00..f736d0143 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxSupplier.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxSupplier.java @@ -33,7 +33,7 @@ import reactor.core.publisher.Flux; * * @param output type of target supplier */ -public class FluxSupplier implements Supplier> { +public class FluxSupplier implements Supplier>, FluxWrapper> { private final Supplier supplier; @@ -42,12 +42,17 @@ public class FluxSupplier implements Supplier> { public FluxSupplier(Supplier supplier) { this(supplier, null); } - + public FluxSupplier(Supplier supplier, Duration period) { this.supplier = supplier; this.period = period; } + @Override + public Supplier getTarget() { + return this.supplier; + } + @Override @SuppressWarnings({ "unchecked", "rawtypes" }) public Flux get() { diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxWrapper.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxWrapper.java new file mode 100644 index 000000000..9062b56bc --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxWrapper.java @@ -0,0 +1,27 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.core; + +/** + * @author Dave Syer + * + */ +public interface FluxWrapper { + + T getTarget(); + +} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/Isolated.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/Isolated.java new file mode 100644 index 000000000..be93e1a04 --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/Isolated.java @@ -0,0 +1,26 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.core; + +/** + * @author Dave Syer + * + */ +public interface Isolated { + + ClassLoader getClassLoader(); +} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedConsumer.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedConsumer.java index bb43d5cdc..91cb76aea 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedConsumer.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedConsumer.java @@ -24,18 +24,25 @@ import org.springframework.util.ClassUtils; * @author Dave Syer * */ -public class IsolatedConsumer implements Consumer { +public class IsolatedConsumer implements Consumer, Isolated { private final Consumer consumer; + private final ClassLoader classLoader; public IsolatedConsumer(Consumer consumer) { this.consumer = consumer; + this.classLoader = consumer.getClass().getClassLoader(); + } + + @Override + public ClassLoader getClassLoader() { + return this.classLoader; } @Override public void accept(T item) { ClassLoader context = ClassUtils - .overrideThreadContextClassLoader(consumer.getClass().getClassLoader()); + .overrideThreadContextClassLoader(this.classLoader); try { consumer.accept(item); } diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedFunction.java index a5665ca9d..d257d1af3 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedFunction.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedFunction.java @@ -24,20 +24,27 @@ import org.springframework.util.ClassUtils; * @author Dave Syer * */ -public class IsolatedFunction implements Function { +public class IsolatedFunction implements Function, Isolated { private final Function function; + private final ClassLoader classLoader; public IsolatedFunction(Function function) { this.function = function; + this.classLoader = function.getClass().getClassLoader(); + } + + @Override + public ClassLoader getClassLoader() { + return this.classLoader; } @Override public T apply(S item) { ClassLoader context = ClassUtils - .overrideThreadContextClassLoader(function.getClass().getClassLoader()); + .overrideThreadContextClassLoader(this.classLoader); try { - return function.apply(item); + return this.function.apply(item); } finally { ClassUtils.overrideThreadContextClassLoader(context); diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedSupplier.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedSupplier.java index f42eb06c4..57cacab58 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedSupplier.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedSupplier.java @@ -24,18 +24,25 @@ import org.springframework.util.ClassUtils; * @author Dave Syer * */ -public class IsolatedSupplier implements Supplier { +public class IsolatedSupplier implements Supplier, Isolated { private final Supplier supplier; + private final ClassLoader classLoader; public IsolatedSupplier(Supplier supplier) { this.supplier = supplier; + this.classLoader = supplier.getClass().getClassLoader(); + } + + @Override + public ClassLoader getClassLoader() { + return this.classLoader; } @Override public T get() { ClassLoader context = ClassUtils - .overrideThreadContextClassLoader(supplier.getClass().getClassLoader()); + .overrideThreadContextClassLoader(this.classLoader); try { return supplier.get(); } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java index 65767b954..09e88ff2a 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java @@ -27,6 +27,7 @@ import java.util.function.Function; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.cloud.function.context.catalog.FunctionInspector; +import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.cloud.function.core.FunctionCatalog; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; @@ -91,6 +92,9 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto return flux.publish(values -> { Flux result = function .apply(values.map(message -> convertInput(function).apply(message))); + if (this.functionInspector.isMessage(function)) { + result = result.map(message -> MessageUtils.unpack(function, message)); + } Flux> aggregate = headers(values); return result.withLatestFrom(aggregate, (p, m) -> message(p, m)); }); @@ -141,7 +145,7 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto .get(StreamConfigurationProperties.ROUTE_KEY); name = stash(key); } - if (name==null && defaultRoute != null) { + if (name == null && defaultRoute != null) { name = stash(defaultRoute); } if (name == null) { @@ -155,10 +159,10 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto else { for (String candidate : names) { Object function = functionCatalog.lookupFunction(candidate); - if (function==null) { + if (function == null) { function = functionCatalog.lookupConsumer(candidate); } - if (function==null) { + if (function == null) { continue; } Class inputType = functionInspector.getInputType(function); @@ -202,8 +206,8 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto Class inputType = functionInspector.getInputType(function); return m -> { if (functionInspector.isMessage(function)) { - return MessageBuilder.withPayload(convertPayload(inputType, m)) - .copyHeaders(m.getHeaders()).build(); + return MessageUtils.create(function, convertPayload(inputType, m), + m.getHeaders()); } else { return convertPayload(inputType, m); diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/SupplierInvokingMessageProducer.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/SupplierInvokingMessageProducer.java index 8410d095a..ff581fba3 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/SupplierInvokingMessageProducer.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/SupplierInvokingMessageProducer.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Supplier; +import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.cloud.function.core.FunctionCatalog; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.endpoint.MessageProducerSupport; @@ -86,7 +87,8 @@ public class SupplierInvokingMessageProducer extends MessageProducerSupport { if (supplier != null) { suppliers.add(name); disposables.put(name, - supplier.get().subscribeOn(Schedulers.elastic()).subscribe(m -> send(name, m))); + supplier.get().subscribeOn(Schedulers.elastic()) + .subscribe(m -> send(name, m))); } } } @@ -94,16 +96,10 @@ public class SupplierInvokingMessageProducer extends MessageProducerSupport { } private void send(String name, Object payload) { - Message message; - if (payload instanceof Message) { - message = MessageBuilder.fromMessage((Message) payload) - .setHeaderIfAbsent(StreamConfigurationProperties.ROUTE_KEY, name) - .build(); - } - else { - message = MessageBuilder.withPayload(payload) - .setHeader(StreamConfigurationProperties.ROUTE_KEY, name).build(); - } + Supplier> supplier = functionCatalog.lookupSupplier(name); + Message message = MessageUtils.unpack(supplier, payload); + message = MessageBuilder.fromMessage(message) + .setHeaderIfAbsent(StreamConfigurationProperties.ROUTE_KEY, name).build(); getOutputChannel().send(message); } diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/ClassLoaderUtils.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/ClassLoaderUtils.java new file mode 100644 index 000000000..5bd4a2826 --- /dev/null +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/ClassLoaderUtils.java @@ -0,0 +1,154 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream.function; + +import java.io.File; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.List; +import java.util.jar.JarFile; + +import org.springframework.util.StringUtils; + +/** + * @author Dave Syer + * + */ +public class ClassLoaderUtils { + + public static ClassLoader createClassLoader() { + URL[] urls = findClassPath(); + if (urls.length == 1) { + URL[] classpath = extractClasspath(urls[0]); + if (classpath != null) { + urls = classpath; + } + } + List child = new ArrayList<>(); + for (URL url : urls) { + child.add(url); + } + for (URL url : urls) { + if (isRoot(StringUtils.getFilename(clean(url.toString())))) { + child.remove(url); + } + } + ClassLoader base = ClassLoaderUtils.class.getClassLoader(); + return new ParentLastURLClassLoader(child.toArray(new URL[0]), base); + } + + private static URL[] extractClasspath(URL url) { + // This works for a jar indirection like in surefire and IntelliJ + if (url.toString().endsWith(".jar")) { + JarFile jar; + try { + jar = new JarFile(new File(url.toURI())); + String path = jar.getManifest().getMainAttributes() + .getValue("Class-Path"); + if (path != null) { + List result = new ArrayList<>(); + for (String element : path.split(" ")) { + result.add(new URL(element)); + } + return result.toArray(new URL[0]); + } + } + catch (Exception e) { + } + } + return null; + } + + private static String clean(String jar) { + // This works with fat jars like Spring Boot where the path elements look like + // jar:file:...something.jar!/. + return jar.endsWith("!/") ? jar.substring(0, jar.length() - 2) : jar; + } + + private static URL[] findClassPath() { + return ((URLClassLoader) ClassLoaderUtils.class.getClassLoader()).getURLs(); + } + + private static boolean isRoot(String file) { + return file.startsWith("reactor-core") || file.startsWith("reactive-streams"); + } + + private static class ParentLastURLClassLoader extends ClassLoader { + private ChildURLClassLoader childClassLoader; + + /** + * This class allows me to call findClass on a classloader + */ + private static class FindClassClassLoader extends ClassLoader { + public FindClassClassLoader(ClassLoader parent) { + super(parent); + } + + @Override + public Class findClass(String name) throws ClassNotFoundException { + return super.findClass(name); + } + } + + /** + * This class delegates (child then parent) for the findClass method for a + * URLClassLoader. We need this because findClass is protected in URLClassLoader + */ + private static class ChildURLClassLoader extends URLClassLoader { + private FindClassClassLoader realParent; + + public ChildURLClassLoader(URL[] urls, FindClassClassLoader realParent) { + super(urls, null); + + this.realParent = realParent; + } + + @Override + public Class findClass(String name) throws ClassNotFoundException { + try { + // first try to use the URLClassLoader findClass + return super.findClass(name); + } + catch (ClassNotFoundException e) { + // if that fails, we ask our real parent classloader to load the class + // (we give up) + return realParent.loadClass(name); + } + } + } + + public ParentLastURLClassLoader(URL[] urls, ClassLoader parent) { + super(parent); + childClassLoader = new ChildURLClassLoader(urls, + new FindClassClassLoader(this.getParent())); + } + + @Override + protected synchronized Class loadClass(String name, boolean resolve) + throws ClassNotFoundException { + try { + // first we try to find a class inside the child classloader + return childClassLoader.findClass(name); + } + catch (ClassNotFoundException e) { + // didn't find it, try the parent + return super.loadClass(name, resolve); + } + } + } +} diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/IsolatedFluxMessagePojoStreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/IsolatedFluxMessagePojoStreamingFunctionTests.java new file mode 100644 index 000000000..cc1dbcf90 --- /dev/null +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/IsolatedFluxMessagePojoStreamingFunctionTests.java @@ -0,0 +1,115 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream.function; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import javax.annotation.PostConstruct; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.function.context.FunctionRegistration; +import org.springframework.cloud.function.context.FunctionRegistry; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.ClassUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +import reactor.core.publisher.Flux; + +/** + * @author Dave Syer + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = IsolatedFluxMessagePojoStreamingFunctionTests.StreamingFunctionApplication.class) +public class IsolatedFluxMessagePojoStreamingFunctionTests { + + @Autowired + Processor processor; + + @Autowired + MessageCollector messageCollector; + + @Test + public void test() throws Exception { + processor.input().send( + MessageBuilder.withPayload(new String("{\"name\":\"foo\"}")).build()); + Message result = messageCollector.forChannel(processor.output()).poll(1000, + TimeUnit.MILLISECONDS); + assertThat(result).isInstanceOf(Message.class); + } + + @SpringBootApplication + public static class StreamingFunctionApplication { + + @Autowired + private FunctionRegistry registry; + + @PostConstruct + public void register() { + // TODO: this class loader doesn't really test the isolation properly. Not + // sure why, but if you remove the reflection in MessageUtils the test is + // still green. + ClassLoader loader = ClassLoaderUtils.createClassLoader(); + Class type = ClassUtils.resolveClassName(Uppercase.class.getName(), + loader); + registry.register( + new FunctionRegistration(BeanUtils.instantiate(type)) + .name("uppercase")); + } + + } + + public static class Uppercase + implements Function>, Flux>> { + @Override + public Flux> apply(Flux> flux) { + return flux.map(message -> MessageBuilder + .withPayload(new Foo(message.getPayload().getName().toUpperCase())) + .build()); + } + } + + protected static class Foo { + private String name; + + Foo() { + } + + public Foo(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +} diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/IsolatedMessagePojoStreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/IsolatedMessagePojoStreamingFunctionTests.java new file mode 100644 index 000000000..b5b08d5b2 --- /dev/null +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/IsolatedMessagePojoStreamingFunctionTests.java @@ -0,0 +1,110 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream.function; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import javax.annotation.PostConstruct; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.function.context.FunctionRegistration; +import org.springframework.cloud.function.context.FunctionRegistry; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.ClassUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Dave Syer + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = IsolatedMessagePojoStreamingFunctionTests.StreamingFunctionApplication.class) +public class IsolatedMessagePojoStreamingFunctionTests { + + @Autowired + Processor processor; + + @Autowired + MessageCollector messageCollector; + + @Test + public void test() throws Exception { + processor.input().send( + MessageBuilder.withPayload(new String("{\"name\":\"foo\"}")).build()); + Message result = messageCollector.forChannel(processor.output()).poll(1000, + TimeUnit.MILLISECONDS); + assertThat(result.getPayload().getClass().getName()) + .isEqualTo(Foo.class.getName()); + } + + @SpringBootApplication + public static class StreamingFunctionApplication { + + @Autowired + private FunctionRegistry registry; + + @PostConstruct + public void register() { + ClassLoader loader = ClassLoaderUtils.createClassLoader(); + Class type = ClassUtils.resolveClassName(Uppercase.class.getName(), + loader); + registry.register( + new FunctionRegistration(BeanUtils.instantiate(type)) + .name("uppercase")); + } + + } + + public static class Uppercase implements Function, Message> { + @Override + public Message apply(Message flux) { + return MessageBuilder + .withPayload(new Foo(flux.getPayload().getName().toUpperCase())) + .build(); + } + } + + protected static class Foo { + private String name; + + Foo() { + } + + public Foo(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java index 384a9ca0d..e6f67912a 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.cloud.function.context.catalog.FunctionInspector; +import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.cloud.function.web.flux.request.FluxRequest; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; @@ -72,6 +73,9 @@ public class FunctionController { flux = flux.log(); } Flux result = Flux.from(function.apply(flux)); + if (inspector.isMessage(function)) { + result = result.map(message -> MessageUtils.unpack(function, message)); + } if (logger.isDebugEnabled()) { logger.debug("Handled POST with function"); } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxHandlerMethodArgumentResolver.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxHandlerMethodArgumentResolver.java index c7e60e485..dbe459f3b 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxHandlerMethodArgumentResolver.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxHandlerMethodArgumentResolver.java @@ -30,13 +30,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.cloud.function.context.catalog.FunctionInspector; +import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.cloud.function.web.flux.constants.WebRequestConstants; import org.springframework.cloud.function.web.util.HeaderUtils; import org.springframework.core.MethodParameter; import org.springframework.core.Ordered; import org.springframework.http.MediaType; import org.springframework.http.server.ServletServerHttpRequest; -import org.springframework.messaging.support.MessageBuilder; +import org.springframework.messaging.MessageHeaders; import org.springframework.util.StreamUtils; import org.springframework.web.bind.support.WebDataBinderFactory; import org.springframework.web.context.request.NativeWebRequest; @@ -106,12 +107,10 @@ public class FluxHandlerMethodArgumentResolver } if (message) { List messages = new ArrayList<>(); + MessageHeaders headers = HeaderUtils.fromHttp(new ServletServerHttpRequest( + webRequest.getNativeRequest(HttpServletRequest.class)).getHeaders()); for (Object payload : body) { - messages.add(MessageBuilder.withPayload(payload) - .copyHeaders(HeaderUtils.fromHttp(new ServletServerHttpRequest( - webRequest.getNativeRequest(HttpServletRequest.class)) - .getHeaders())) - .build()); + messages.add(MessageUtils.create(handler, payload, headers)); } body = messages; }