diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java index e3011c88d..630f88e09 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java @@ -49,6 +49,9 @@ import org.springframework.cloud.function.core.FluxSupplier; import org.springframework.cloud.function.core.FunctionCatalog; import org.springframework.cloud.function.core.FunctionFactoryMetadata; import org.springframework.cloud.function.core.FunctionFactoryUtils; +import org.springframework.cloud.function.core.IsolatedConsumer; +import org.springframework.cloud.function.core.IsolatedFunction; +import org.springframework.cloud.function.core.IsolatedSupplier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.ResolvableType; @@ -250,7 +253,7 @@ public class ContextFunctionCatalogAutoConfiguration { if (stages.length == 0 && source.size() == 1) { stages = new String[] { source.keySet().iterator().next() }; } - Object function = stages.length>0 ? lookup(stages[0], source) : null; + Object function = stages.length > 0 ? lookup(stages[0], source) : null; if (function == null) { return null; } @@ -453,17 +456,34 @@ public class ContextFunctionCatalogAutoConfiguration { @SuppressWarnings({ "unchecked", "rawtypes" }) private T target(T target, String key) { - if (target instanceof Supplier - && !isFluxSupplier(key, (Supplier) target)) { - target = (T) new FluxSupplier((Supplier) target); + boolean isolated = getClass().getClassLoader() != target.getClass() + .getClassLoader(); + if (target instanceof Supplier) { + boolean flux = isFluxSupplier(key, (Supplier) target); + if (isolated) { + target = (T) new IsolatedSupplier((Supplier) target); + } + if (!flux) { + target = (T) new FluxSupplier((Supplier) target); + } } - else if (target instanceof Function - && !isFluxFunction(key, (Function) target)) { - target = (T) new FluxFunction((Function) target); + else if (target instanceof Function) { + boolean flux = isFluxFunction(key, (Function) target); + if (isolated) { + target = (T) new IsolatedFunction((Function) target); + } + if (!flux) { + target = (T) new FluxFunction((Function) target); + } } - else if (target instanceof Consumer - && !isFluxConsumer(key, (Consumer) target)) { - target = (T) new FluxConsumer((Consumer) target); + else if (target instanceof Consumer) { + boolean flux = isFluxConsumer(key, (Consumer) target); + if (isolated) { + target = (T) new IsolatedConsumer((Consumer) target); + } + if (!flux) { + target = (T) new FluxConsumer((Consumer) target); + } } return target; } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/ContextFunctionPostProcessorTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/ContextFunctionPostProcessorTests.java index 4143a80b4..75e28e4fd 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/ContextFunctionPostProcessorTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/ContextFunctionPostProcessorTests.java @@ -16,12 +16,22 @@ package org.springframework.cloud.function.context; +import java.net.URLClassLoader; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; +import org.junit.After; import org.junit.Test; +import org.springframework.beans.BeanUtils; import org.springframework.cloud.function.context.ContextFunctionCatalogAutoConfiguration.ContextFunctionRegistry; +import org.springframework.cloud.function.context.ContextFunctionCatalogAutoConfigurationTests.Bar; +import org.springframework.test.util.ReflectionTestUtils; +import org.springframework.util.ClassUtils; import static org.assertj.core.api.Assertions.assertThat; @@ -36,6 +46,18 @@ import reactor.core.publisher.Flux; public class ContextFunctionPostProcessorTests { private ContextFunctionRegistry processor = new ContextFunctionRegistry(); + private URLClassLoader classLoader; + private ClassLoader contextClassLoader; + + @After + public void close() throws Exception { + if (this.classLoader != null) { + this.classLoader.close(); + } + if (Thread.currentThread().getContextClassLoader() != null) { + ClassUtils.overrideThreadContextClassLoader(contextClassLoader); + } + } @Test public void basicRegistrationFeatures() { @@ -78,22 +100,93 @@ public class ContextFunctionPostProcessorTests { assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("Hello 4"); } - protected static class Foos implements Function { + @Test + public void isolatedFunction() { + contextClassLoader = ClassUtils + .overrideThreadContextClassLoader(getClass().getClassLoader()); + processor.register(new FunctionRegistration<>(create(Foos.class)).names("foos")); + @SuppressWarnings("unchecked") + Function, Flux> foos = (Function, Flux>) processor + .lookupFunction("foos"); + assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("4"); + } + + @Test + public void isolatedSupplier() { + contextClassLoader = ClassUtils + .overrideThreadContextClassLoader(getClass().getClassLoader()); + processor.register(new FunctionRegistration<>(create(Source.class)).names("source")); + @SuppressWarnings("unchecked") + Supplier> source = (Supplier>) processor + .lookupSupplier("source"); + assertThat(source.get().blockFirst()).isEqualTo(4); + } + + @Test + public void isolatedConsumer() { + contextClassLoader = ClassUtils + .overrideThreadContextClassLoader(getClass().getClassLoader()); + Object target = create(Sink.class); + processor.register(new FunctionRegistration<>(target).names("sink")); + @SuppressWarnings("unchecked") + Consumer> sink = (Consumer>) processor + .lookupConsumer("sink"); + sink.accept(Flux.just("Hello")); + @SuppressWarnings("unchecked") + List values = (List) ReflectionTestUtils.getField(target, "values"); + assertThat(values).contains("Hello"); + } + + private Object create(Class type) { + this.classLoader = new URLClassLoader( + ((URLClassLoader) getClass().getClassLoader()).getURLs(), + getClass().getClassLoader().getParent()); + return BeanUtils + .instantiate(ClassUtils.resolveClassName(type.getName(), classLoader)); + } + + public static class Foos implements Function { @Override public String apply(Integer t) { + assertThat(ClassUtils.resolveClassName(Bar.class.getName(), null) + .getClassLoader()).isEqualTo(getClass().getClassLoader()); return "" + 2 * t; } } - protected static class Bars implements Function { + public static class Bars implements Function { @Override public String apply(String t) { + assertThat(ClassUtils.resolveClassName(Bar.class.getName(), null) + .getClassLoader()).isEqualTo(getClass().getClassLoader()); return "Hello " + t; } } + public static class Sink implements Consumer { + + private List values = new ArrayList<>(); + + @Override + public void accept(String t) { + assertThat(ClassUtils.resolveClassName(Bar.class.getName(), null) + .getClassLoader()).isEqualTo(getClass().getClassLoader()); + values.add(t); + } + + } + + public static class Source implements Supplier { + + @Override + public Integer get() { + return 4; + } + + } + } diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedConsumer.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedConsumer.java new file mode 100644 index 000000000..bb43d5cdc --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedConsumer.java @@ -0,0 +1,47 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.core; + +import java.util.function.Consumer; + +import org.springframework.util.ClassUtils; + +/** + * @author Dave Syer + * + */ +public class IsolatedConsumer implements Consumer { + + private final Consumer consumer; + + public IsolatedConsumer(Consumer consumer) { + this.consumer = consumer; + } + + @Override + public void accept(T item) { + ClassLoader context = ClassUtils + .overrideThreadContextClassLoader(consumer.getClass().getClassLoader()); + try { + consumer.accept(item); + } + finally { + ClassUtils.overrideThreadContextClassLoader(context); + } + } + +} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedFunction.java new file mode 100644 index 000000000..a5665ca9d --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedFunction.java @@ -0,0 +1,47 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.core; + +import java.util.function.Function; + +import org.springframework.util.ClassUtils; + +/** + * @author Dave Syer + * + */ +public class IsolatedFunction implements Function { + + private final Function function; + + public IsolatedFunction(Function function) { + this.function = function; + } + + @Override + public T apply(S item) { + ClassLoader context = ClassUtils + .overrideThreadContextClassLoader(function.getClass().getClassLoader()); + try { + return function.apply(item); + } + finally { + ClassUtils.overrideThreadContextClassLoader(context); + } + } + +} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedSupplier.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedSupplier.java new file mode 100644 index 000000000..f42eb06c4 --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedSupplier.java @@ -0,0 +1,47 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.core; + +import java.util.function.Supplier; + +import org.springframework.util.ClassUtils; + +/** + * @author Dave Syer + * + */ +public class IsolatedSupplier implements Supplier { + + private final Supplier supplier; + + public IsolatedSupplier(Supplier supplier) { + this.supplier = supplier; + } + + @Override + public T get() { + ClassLoader context = ClassUtils + .overrideThreadContextClassLoader(supplier.getClass().getClassLoader()); + try { + return supplier.get(); + } + finally { + ClassUtils.overrideThreadContextClassLoader(context); + } + } + +}