Isolate the context class loader if function appears to need it
When a function is created using an isolated class loader it might want to use that class loader again for its invocations, and a lot of tools (Spring etc.) use the context class loader for that kind of thing if they don't have an explicit default value. So we set the context class loader before, and unset it after, the function invocation using a convenience wrapper.
This commit is contained in:
@@ -49,6 +49,9 @@ import org.springframework.cloud.function.core.FluxSupplier;
|
||||
import org.springframework.cloud.function.core.FunctionCatalog;
|
||||
import org.springframework.cloud.function.core.FunctionFactoryMetadata;
|
||||
import org.springframework.cloud.function.core.FunctionFactoryUtils;
|
||||
import org.springframework.cloud.function.core.IsolatedConsumer;
|
||||
import org.springframework.cloud.function.core.IsolatedFunction;
|
||||
import org.springframework.cloud.function.core.IsolatedSupplier;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.ResolvableType;
|
||||
@@ -250,7 +253,7 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
if (stages.length == 0 && source.size() == 1) {
|
||||
stages = new String[] { source.keySet().iterator().next() };
|
||||
}
|
||||
Object function = stages.length>0 ? lookup(stages[0], source) : null;
|
||||
Object function = stages.length > 0 ? lookup(stages[0], source) : null;
|
||||
if (function == null) {
|
||||
return null;
|
||||
}
|
||||
@@ -453,17 +456,34 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
private <T> T target(T target, String key) {
|
||||
if (target instanceof Supplier<?>
|
||||
&& !isFluxSupplier(key, (Supplier<?>) target)) {
|
||||
target = (T) new FluxSupplier((Supplier<?>) target);
|
||||
boolean isolated = getClass().getClassLoader() != target.getClass()
|
||||
.getClassLoader();
|
||||
if (target instanceof Supplier<?>) {
|
||||
boolean flux = isFluxSupplier(key, (Supplier<?>) target);
|
||||
if (isolated) {
|
||||
target = (T) new IsolatedSupplier((Supplier<?>) target);
|
||||
}
|
||||
if (!flux) {
|
||||
target = (T) new FluxSupplier((Supplier<?>) target);
|
||||
}
|
||||
}
|
||||
else if (target instanceof Function<?, ?>
|
||||
&& !isFluxFunction(key, (Function<?, ?>) target)) {
|
||||
target = (T) new FluxFunction((Function<?, ?>) target);
|
||||
else if (target instanceof Function<?, ?>) {
|
||||
boolean flux = isFluxFunction(key, (Function<?, ?>) target);
|
||||
if (isolated) {
|
||||
target = (T) new IsolatedFunction((Function<?, ?>) target);
|
||||
}
|
||||
if (!flux) {
|
||||
target = (T) new FluxFunction((Function<?, ?>) target);
|
||||
}
|
||||
}
|
||||
else if (target instanceof Consumer<?>
|
||||
&& !isFluxConsumer(key, (Consumer<?>) target)) {
|
||||
target = (T) new FluxConsumer((Consumer<?>) target);
|
||||
else if (target instanceof Consumer<?>) {
|
||||
boolean flux = isFluxConsumer(key, (Consumer<?>) target);
|
||||
if (isolated) {
|
||||
target = (T) new IsolatedConsumer((Consumer<?>) target);
|
||||
}
|
||||
if (!flux) {
|
||||
target = (T) new FluxConsumer((Consumer<?>) target);
|
||||
}
|
||||
}
|
||||
return target;
|
||||
}
|
||||
|
||||
@@ -16,12 +16,22 @@
|
||||
|
||||
package org.springframework.cloud.function.context;
|
||||
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.cloud.function.context.ContextFunctionCatalogAutoConfiguration.ContextFunctionRegistry;
|
||||
import org.springframework.cloud.function.context.ContextFunctionCatalogAutoConfigurationTests.Bar;
|
||||
import org.springframework.test.util.ReflectionTestUtils;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@@ -36,6 +46,18 @@ import reactor.core.publisher.Flux;
|
||||
public class ContextFunctionPostProcessorTests {
|
||||
|
||||
private ContextFunctionRegistry processor = new ContextFunctionRegistry();
|
||||
private URLClassLoader classLoader;
|
||||
private ClassLoader contextClassLoader;
|
||||
|
||||
@After
|
||||
public void close() throws Exception {
|
||||
if (this.classLoader != null) {
|
||||
this.classLoader.close();
|
||||
}
|
||||
if (Thread.currentThread().getContextClassLoader() != null) {
|
||||
ClassUtils.overrideThreadContextClassLoader(contextClassLoader);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void basicRegistrationFeatures() {
|
||||
@@ -78,22 +100,93 @@ public class ContextFunctionPostProcessorTests {
|
||||
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("Hello 4");
|
||||
}
|
||||
|
||||
protected static class Foos implements Function<Integer, String> {
|
||||
@Test
|
||||
public void isolatedFunction() {
|
||||
contextClassLoader = ClassUtils
|
||||
.overrideThreadContextClassLoader(getClass().getClassLoader());
|
||||
processor.register(new FunctionRegistration<>(create(Foos.class)).names("foos"));
|
||||
@SuppressWarnings("unchecked")
|
||||
Function<Flux<Integer>, Flux<String>> foos = (Function<Flux<Integer>, Flux<String>>) processor
|
||||
.lookupFunction("foos");
|
||||
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("4");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void isolatedSupplier() {
|
||||
contextClassLoader = ClassUtils
|
||||
.overrideThreadContextClassLoader(getClass().getClassLoader());
|
||||
processor.register(new FunctionRegistration<>(create(Source.class)).names("source"));
|
||||
@SuppressWarnings("unchecked")
|
||||
Supplier<Flux<Integer>> source = (Supplier<Flux<Integer>>) processor
|
||||
.lookupSupplier("source");
|
||||
assertThat(source.get().blockFirst()).isEqualTo(4);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void isolatedConsumer() {
|
||||
contextClassLoader = ClassUtils
|
||||
.overrideThreadContextClassLoader(getClass().getClassLoader());
|
||||
Object target = create(Sink.class);
|
||||
processor.register(new FunctionRegistration<>(target).names("sink"));
|
||||
@SuppressWarnings("unchecked")
|
||||
Consumer<Flux<String>> sink = (Consumer<Flux<String>>) processor
|
||||
.lookupConsumer("sink");
|
||||
sink.accept(Flux.just("Hello"));
|
||||
@SuppressWarnings("unchecked")
|
||||
List<String> values = (List<String>) ReflectionTestUtils.getField(target, "values");
|
||||
assertThat(values).contains("Hello");
|
||||
}
|
||||
|
||||
private Object create(Class<?> type) {
|
||||
this.classLoader = new URLClassLoader(
|
||||
((URLClassLoader) getClass().getClassLoader()).getURLs(),
|
||||
getClass().getClassLoader().getParent());
|
||||
return BeanUtils
|
||||
.instantiate(ClassUtils.resolveClassName(type.getName(), classLoader));
|
||||
}
|
||||
|
||||
public static class Foos implements Function<Integer, String> {
|
||||
|
||||
@Override
|
||||
public String apply(Integer t) {
|
||||
assertThat(ClassUtils.resolveClassName(Bar.class.getName(), null)
|
||||
.getClassLoader()).isEqualTo(getClass().getClassLoader());
|
||||
return "" + 2 * t;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected static class Bars implements Function<String, String> {
|
||||
public static class Bars implements Function<String, String> {
|
||||
|
||||
@Override
|
||||
public String apply(String t) {
|
||||
assertThat(ClassUtils.resolveClassName(Bar.class.getName(), null)
|
||||
.getClassLoader()).isEqualTo(getClass().getClassLoader());
|
||||
return "Hello " + t;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Sink implements Consumer<String> {
|
||||
|
||||
private List<String> values = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public void accept(String t) {
|
||||
assertThat(ClassUtils.resolveClassName(Bar.class.getName(), null)
|
||||
.getClassLoader()).isEqualTo(getClass().getClassLoader());
|
||||
values.add(t);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Source implements Supplier<Integer> {
|
||||
|
||||
@Override
|
||||
public Integer get() {
|
||||
return 4;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user