GH-437 Pass ExecutionContext via MessageHeader
Caching it as part of ApplicationContext was not the right idea so this changes it
This commit is contained in:
@@ -20,12 +20,14 @@ import java.io.Closeable;
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.reactivestreams.Publisher;
|
||||
@@ -36,14 +38,17 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
|
||||
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
|
||||
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
|
||||
import org.springframework.cloud.function.context.config.FunctionContextUtils;
|
||||
import org.springframework.cloud.function.context.config.JsonMessageConverter;
|
||||
import org.springframework.cloud.function.context.config.RoutingFunction;
|
||||
import org.springframework.cloud.function.context.config.SmartCompositeMessageConverter;
|
||||
import org.springframework.cloud.function.json.JacksonMapper;
|
||||
import org.springframework.cloud.function.json.JsonMapper;
|
||||
import org.springframework.cloud.function.utils.FunctionClassUtils;
|
||||
import org.springframework.context.ApplicationContextInitializer;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.core.convert.support.GenericConversionService;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
@@ -65,7 +70,7 @@ public abstract class AbstractSpringFunctionAdapterInitializer<C> implements Clo
|
||||
/**
|
||||
* Name of the bean for registering the target execution context passed to `initialize(context)` operation.
|
||||
*/
|
||||
public static final String TARGET_EXECUTION_CTX_BEAN_NAME = "targetExecutionContext";
|
||||
public static final String TARGET_EXECUTION_CTX_NAME = "executionContext";
|
||||
|
||||
private final Class<?> configurationClass;
|
||||
|
||||
@@ -113,12 +118,14 @@ public abstract class AbstractSpringFunctionAdapterInitializer<C> implements Clo
|
||||
}
|
||||
logger.info("Initializing: " + this.configurationClass);
|
||||
SpringApplication builder = springApplication();
|
||||
|
||||
this.registerTargetContext(targetContext, builder);
|
||||
ConfigurableApplicationContext context = builder.run();
|
||||
context.getAutowireCapableBeanFactory().autowireBean(this);
|
||||
this.context = context;
|
||||
if (this.catalog == null) {
|
||||
SmartCompositeMessageConverter messageConverter =
|
||||
new SmartCompositeMessageConverter(Collections.singletonList(new JsonMessageConverter(new JacksonMapper(new ObjectMapper()))));
|
||||
this.catalog = new SimpleFunctionRegistry(new GenericConversionService(),
|
||||
messageConverter, new JacksonMapper(new ObjectMapper()));
|
||||
initFunctionConsumerOrSupplierFromContext(targetContext);
|
||||
}
|
||||
else {
|
||||
@@ -126,19 +133,6 @@ public abstract class AbstractSpringFunctionAdapterInitializer<C> implements Clo
|
||||
}
|
||||
}
|
||||
|
||||
private void registerTargetContext(C targetContext, SpringApplication builder) {
|
||||
if (targetContext != null) {
|
||||
builder.addInitializers(new ApplicationContextInitializer<ConfigurableApplicationContext>() {
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void initialize(ConfigurableApplicationContext applicationContext) {
|
||||
((GenericApplicationContext) applicationContext).registerBean(TARGET_EXECUTION_CTX_BEAN_NAME,
|
||||
(Class<C>) targetContext.getClass(), (Supplier<C>) () -> targetContext);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
protected Class<?> getInputType() {
|
||||
|
||||
Object func = function();
|
||||
@@ -170,7 +164,6 @@ public abstract class AbstractSpringFunctionAdapterInitializer<C> implements Clo
|
||||
|
||||
protected Publisher<?> apply(Publisher<?> input) {
|
||||
if (this.function != null) {
|
||||
//return Flux.from(this.function.apply(input));
|
||||
Object result = this.function.apply(input);
|
||||
if (result instanceof Publisher) {
|
||||
return Flux.from((Publisher) result);
|
||||
@@ -278,9 +271,10 @@ public abstract class AbstractSpringFunctionAdapterInitializer<C> implements Clo
|
||||
Type type = FunctionContextUtils.
|
||||
findType(name, this.context.getBeanFactory());
|
||||
|
||||
this.functionRegistration = functionRegistration.type(new FunctionType(type)).wrap();
|
||||
this.functionRegistration = functionRegistration.type(new FunctionType(type));
|
||||
|
||||
return (T) functionRegistration.getTarget();
|
||||
((FunctionRegistry) this.catalog).register(functionRegistration);
|
||||
return this.catalog.lookup(name);
|
||||
}
|
||||
|
||||
private void initFunctionConsumerOrSupplierFromContext(Object targetContext) {
|
||||
|
||||
Reference in New Issue
Block a user