|
|
|
|
@@ -16,11 +16,16 @@
|
|
|
|
|
|
|
|
|
|
package org.springframework.cloud.function.context.catalog;
|
|
|
|
|
|
|
|
|
|
import java.lang.reflect.ParameterizedType;
|
|
|
|
|
import java.lang.reflect.Type;
|
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
import java.util.Collection;
|
|
|
|
|
import java.util.Collections;
|
|
|
|
|
import java.util.HashMap;
|
|
|
|
|
import java.util.LinkedHashSet;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
import java.util.Optional;
|
|
|
|
|
import java.util.Set;
|
|
|
|
|
import java.util.function.Consumer;
|
|
|
|
|
import java.util.function.Function;
|
|
|
|
|
@@ -33,6 +38,7 @@ import org.apache.commons.logging.LogFactory;
|
|
|
|
|
import org.reactivestreams.Publisher;
|
|
|
|
|
import reactor.core.publisher.Flux;
|
|
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
import reactor.util.function.Tuples;
|
|
|
|
|
|
|
|
|
|
import org.springframework.beans.BeansException;
|
|
|
|
|
import org.springframework.beans.factory.BeanFactory;
|
|
|
|
|
@@ -52,13 +58,21 @@ import org.springframework.context.ConfigurableApplicationContext;
|
|
|
|
|
import org.springframework.core.annotation.AnnotatedElementUtils;
|
|
|
|
|
import org.springframework.core.convert.ConversionService;
|
|
|
|
|
import org.springframework.core.type.StandardMethodMetadata;
|
|
|
|
|
import org.springframework.expression.Expression;
|
|
|
|
|
import org.springframework.expression.spel.standard.SpelExpressionParser;
|
|
|
|
|
import org.springframework.lang.Nullable;
|
|
|
|
|
import org.springframework.messaging.Message;
|
|
|
|
|
import org.springframework.messaging.MessageHeaders;
|
|
|
|
|
import org.springframework.messaging.converter.CompositeMessageConverter;
|
|
|
|
|
import org.springframework.messaging.support.MessageBuilder;
|
|
|
|
|
import org.springframework.util.Assert;
|
|
|
|
|
import org.springframework.util.CollectionUtils;
|
|
|
|
|
import org.springframework.util.MimeType;
|
|
|
|
|
import org.springframework.util.MimeTypeUtils;
|
|
|
|
|
import org.springframework.util.ObjectUtils;
|
|
|
|
|
import org.springframework.util.StringUtils;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Implementation of {@link FunctionRegistry} and {@link FunctionCatalog} which is aware of the
|
|
|
|
|
* underlying {@link BeanFactory} to access available functions. Functions that are registered via
|
|
|
|
|
@@ -74,9 +88,9 @@ public class BeanFactoryAwareFunctionRegistry
|
|
|
|
|
|
|
|
|
|
private ConfigurableApplicationContext applicationContext;
|
|
|
|
|
|
|
|
|
|
private Map<Object, FunctionRegistration<Object>> registrationsByFunction = new HashMap<>();
|
|
|
|
|
private final Map<Object, FunctionRegistration<Object>> registrationsByFunction = new HashMap<>();
|
|
|
|
|
|
|
|
|
|
private Map<String, FunctionRegistration<Object>> registrationsByName = new HashMap<>();
|
|
|
|
|
private final Map<String, FunctionRegistration<Object>> registrationsByName = new HashMap<>();
|
|
|
|
|
|
|
|
|
|
private final ConversionService conversionService;
|
|
|
|
|
|
|
|
|
|
@@ -102,19 +116,11 @@ public class BeanFactoryAwareFunctionRegistry
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
|
public <T> T lookup(String definition, MimeType... acceptedOutputTypes) {
|
|
|
|
|
public <T> T lookup(String definition, String... acceptedOutputTypes) {
|
|
|
|
|
Assert.notEmpty(acceptedOutputTypes, "'acceptedOutputTypes' must not be null or empty");
|
|
|
|
|
return (T) this.compose(null, definition, acceptedOutputTypes);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean isMessage(Object function) {
|
|
|
|
|
if (function instanceof FunctionInvocationWrapper) {
|
|
|
|
|
function = ((FunctionInvocationWrapper) function).target;
|
|
|
|
|
}
|
|
|
|
|
return FunctionInspector.super.isMessage(function);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings({ "rawtypes", "unchecked" })
|
|
|
|
|
@Override
|
|
|
|
|
public void afterSingletonsInstantiated() {
|
|
|
|
|
@@ -153,12 +159,96 @@ public class BeanFactoryAwareFunctionRegistry
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public FunctionRegistration<?> getRegistration(Object function) {
|
|
|
|
|
if (function instanceof FunctionInvocationWrapper) {
|
|
|
|
|
FunctionRegistration<?> registration = this.registrationsByFunction.get(function);
|
|
|
|
|
// need to do this due to the deployer not wrapping the actual target into FunctionInvocationWrapper
|
|
|
|
|
// hence the lookup would need to be made by the actual target
|
|
|
|
|
if (registration == null && function instanceof FunctionInvocationWrapper) {
|
|
|
|
|
function = ((FunctionInvocationWrapper) function).target;
|
|
|
|
|
}
|
|
|
|
|
return this.registrationsByFunction.get(function);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Object locateFunction(String name) {
|
|
|
|
|
Object function = null;
|
|
|
|
|
if (this.applicationContext.containsBean(name)) {
|
|
|
|
|
function = this.applicationContext.getBean(name);
|
|
|
|
|
}
|
|
|
|
|
if (function == null) {
|
|
|
|
|
function = this.registrationsByName.get(name);
|
|
|
|
|
}
|
|
|
|
|
return function;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings({ "unchecked", "rawtypes" })
|
|
|
|
|
private Function<?, ?> compose(Class<?> type, String definition, String... acceptedOutputTypes) {
|
|
|
|
|
Function<?, ?> resultFunction = null;
|
|
|
|
|
if (this.registrationsByName.containsKey(definition)) {
|
|
|
|
|
Object targetFunction = this.registrationsByName.get(definition).getTarget();
|
|
|
|
|
Type functionType = this.registrationsByName.get(definition).getType().getType();
|
|
|
|
|
resultFunction = new FunctionInvocationWrapper(targetFunction, functionType, definition, acceptedOutputTypes);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
if (StringUtils.isEmpty(definition)) {
|
|
|
|
|
String[] functionNames = this.applicationContext.getBeanNamesForType(Function.class);
|
|
|
|
|
Assert.notEmpty(functionNames, "Can't find any functions in BeanFactory");
|
|
|
|
|
Assert.isTrue(functionNames.length == 1, "Found more then one function in BeanFactory");
|
|
|
|
|
definition = functionNames[0];
|
|
|
|
|
}
|
|
|
|
|
String[] names = StringUtils.delimitedListToStringArray(definition.replaceAll(",", "|").trim(), "|");
|
|
|
|
|
StringBuilder composedNameBuilder = new StringBuilder();
|
|
|
|
|
String prefix = "";
|
|
|
|
|
Type composedFunctionType = null;
|
|
|
|
|
for (String name : names) {
|
|
|
|
|
Object function = this.locateFunction(name);
|
|
|
|
|
if (function == null) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
if (composedFunctionType == null) {
|
|
|
|
|
composedFunctionType = beanDefinitionExists(name)
|
|
|
|
|
? FunctionType.of(FunctionContextUtils.findType(
|
|
|
|
|
(ConfigurableListableBeanFactory) applicationContext.getBeanFactory(), name)).getType()
|
|
|
|
|
: new FunctionType(function.getClass()).getType();
|
|
|
|
|
}
|
|
|
|
|
composedNameBuilder.append(prefix);
|
|
|
|
|
composedNameBuilder.append(name);
|
|
|
|
|
FunctionRegistration<Object> registration;
|
|
|
|
|
Type functionType = null;
|
|
|
|
|
if (function instanceof FunctionRegistration) {
|
|
|
|
|
registration = (FunctionRegistration<Object>) function;
|
|
|
|
|
functionType = registration.getType().getType();
|
|
|
|
|
function = registration.getTarget();
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
String[] aliasNames = this.getAliases(name).toArray(new String[] {});
|
|
|
|
|
functionType = beanDefinitionExists(aliasNames)
|
|
|
|
|
? FunctionType.of(FunctionContextUtils.findType(
|
|
|
|
|
(ConfigurableListableBeanFactory) applicationContext.getBeanFactory(), aliasNames)).getType()
|
|
|
|
|
: new FunctionType(function.getClass()).getType();
|
|
|
|
|
registration = new FunctionRegistration<>(function, name).type(functionType);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
registrationsByFunction.putIfAbsent(function, registration);
|
|
|
|
|
registrationsByName.putIfAbsent(name, registration);
|
|
|
|
|
function = new FunctionInvocationWrapper(function, functionType, composedNameBuilder.toString(), acceptedOutputTypes);
|
|
|
|
|
if (resultFunction == null) {
|
|
|
|
|
resultFunction = (Function<?, ?>) function;
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
composedFunctionType = FunctionTypeUtils.compose(composedFunctionType, functionType);
|
|
|
|
|
resultFunction = new FunctionInvocationWrapper(resultFunction.andThen((Function) function),
|
|
|
|
|
composedFunctionType, composedNameBuilder.toString(), acceptedOutputTypes);
|
|
|
|
|
registration = new FunctionRegistration<Object>(resultFunction, composedNameBuilder.toString())
|
|
|
|
|
.type(composedFunctionType);
|
|
|
|
|
registrationsByFunction.putIfAbsent(resultFunction, registration);
|
|
|
|
|
registrationsByName.putIfAbsent(composedNameBuilder.toString(), registration);
|
|
|
|
|
}
|
|
|
|
|
prefix = "|";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
return resultFunction;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Collection<String> getAliases(String key) {
|
|
|
|
|
Collection<String> names = new LinkedHashSet<>();
|
|
|
|
|
String value = getQualifier(key);
|
|
|
|
|
@@ -185,96 +275,6 @@ public class BeanFactoryAwareFunctionRegistry
|
|
|
|
|
return key;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Object locateFunction(String name) {
|
|
|
|
|
Object function = null;
|
|
|
|
|
if (this.applicationContext.containsBean(name)) {
|
|
|
|
|
function = this.applicationContext.getBean(name);
|
|
|
|
|
}
|
|
|
|
|
if (function == null) {
|
|
|
|
|
function = this.registrationsByName.get(name);
|
|
|
|
|
}
|
|
|
|
|
return function;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings({ "unchecked", "rawtypes" })
|
|
|
|
|
private Function<?, ?> compose(Class<?> type, String definition, MimeType... acceptedOutputTypes) {
|
|
|
|
|
Function<?, ?> resultFunction = null;
|
|
|
|
|
if (this.registrationsByName.containsKey(definition)) {
|
|
|
|
|
resultFunction = new FunctionInvocationWrapper(this.registrationsByName.get(definition), false,
|
|
|
|
|
acceptedOutputTypes);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
if (StringUtils.isEmpty(definition)) {
|
|
|
|
|
String[] functionNames = this.applicationContext.getBeanNamesForType(Function.class);
|
|
|
|
|
Assert.notEmpty(functionNames, "Can't find any functions in BeanFactory");
|
|
|
|
|
Assert.isTrue(functionNames.length == 1, "Found more then one function in BeanFactory");
|
|
|
|
|
definition = functionNames[0];
|
|
|
|
|
}
|
|
|
|
|
String[] names = StringUtils.delimitedListToStringArray(definition.replaceAll(",", "|").trim(), "|");
|
|
|
|
|
|
|
|
|
|
FunctionType previousFunctionType = null;
|
|
|
|
|
|
|
|
|
|
StringBuilder composedNameBuilder = new StringBuilder();
|
|
|
|
|
String prefix = "";
|
|
|
|
|
for (String name : names) {
|
|
|
|
|
Object function = this.locateFunction(name);
|
|
|
|
|
if (function == null) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
composedNameBuilder.append(prefix);
|
|
|
|
|
composedNameBuilder.append(name);
|
|
|
|
|
|
|
|
|
|
FunctionRegistration<Object> registration;
|
|
|
|
|
FunctionType funcType;
|
|
|
|
|
if (function instanceof FunctionRegistration) {
|
|
|
|
|
registration = (FunctionRegistration<Object>) function;
|
|
|
|
|
funcType = registration.getType();
|
|
|
|
|
function = registration.getTarget();
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
String[] aliasNames = this.getAliases(name).toArray(new String[] {});
|
|
|
|
|
funcType = beanDefinitionExists(aliasNames)
|
|
|
|
|
? FunctionType.of(FunctionContextUtils.findType(
|
|
|
|
|
(ConfigurableListableBeanFactory) applicationContext.getBeanFactory(), aliasNames))
|
|
|
|
|
: new FunctionType(function.getClass());
|
|
|
|
|
registration = new FunctionRegistration<>(function, name).type(funcType);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
registrationsByFunction.putIfAbsent(function, registration);
|
|
|
|
|
registrationsByName.putIfAbsent(name, registration);
|
|
|
|
|
function = new FunctionInvocationWrapper(registration, false, acceptedOutputTypes);
|
|
|
|
|
if (resultFunction == null) {
|
|
|
|
|
resultFunction = (Function<?, ?>) function;
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
resultFunction = resultFunction.andThen((Function) function);
|
|
|
|
|
if (this.getOutputWrapper(function).isAssignableFrom(Flux.class)) {
|
|
|
|
|
funcType = FunctionType.compose(previousFunctionType.wrap(Flux.class), funcType);
|
|
|
|
|
logger.info("Since composed function " + composedNameBuilder.toString()
|
|
|
|
|
+ " consists of at least one function "
|
|
|
|
|
+ "with return type Publisher, its resulting signature is Function<?, Publisher<?>>");
|
|
|
|
|
}
|
|
|
|
|
else if (this.getOutputWrapper(function).isAssignableFrom(Mono.class)) {
|
|
|
|
|
funcType = FunctionType.compose(previousFunctionType.wrap(Mono.class), funcType);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
funcType = FunctionType.compose(previousFunctionType, funcType);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
registration = new FunctionRegistration<Object>(resultFunction, composedNameBuilder.toString())
|
|
|
|
|
.type(funcType);
|
|
|
|
|
registrationsByFunction.putIfAbsent(resultFunction, registration);
|
|
|
|
|
registrationsByName.putIfAbsent(composedNameBuilder.toString(), registration);
|
|
|
|
|
resultFunction = new FunctionInvocationWrapper(registration, true, acceptedOutputTypes);
|
|
|
|
|
}
|
|
|
|
|
previousFunctionType = funcType;
|
|
|
|
|
prefix = "|";
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return resultFunction;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private boolean beanDefinitionExists(String... names) {
|
|
|
|
|
for (String name : names) {
|
|
|
|
|
if (this.applicationContext.getBeanFactory().containsBeanDefinition(name)) {
|
|
|
|
|
@@ -295,24 +295,25 @@ public class BeanFactoryAwareFunctionRegistry
|
|
|
|
|
|
|
|
|
|
private final Object target;
|
|
|
|
|
|
|
|
|
|
private final FunctionRegistration<?> functionRegistration;
|
|
|
|
|
private final Type functionType;
|
|
|
|
|
|
|
|
|
|
private final boolean composed;
|
|
|
|
|
private boolean composed;
|
|
|
|
|
|
|
|
|
|
private final FunctionTypeConversionHelper functionTypeConversionHelper;
|
|
|
|
|
private final String[] acceptedOutputMimeTypes;
|
|
|
|
|
|
|
|
|
|
private final MimeType[] acceptedOutputTypes;
|
|
|
|
|
private final String functionDefinition;
|
|
|
|
|
|
|
|
|
|
FunctionInvocationWrapper(FunctionRegistration<?> functionRegistration, boolean composed,
|
|
|
|
|
MimeType... acceptedOutputTypes) {
|
|
|
|
|
this.target = functionRegistration.getTarget();
|
|
|
|
|
this.functionRegistration = functionRegistration;
|
|
|
|
|
this.composed = composed;
|
|
|
|
|
this.acceptedOutputTypes = acceptedOutputTypes;
|
|
|
|
|
this.functionTypeConversionHelper = new FunctionTypeConversionHelper(this.functionRegistration,
|
|
|
|
|
conversionService, messageConverter);
|
|
|
|
|
FunctionInvocationWrapper(Object target, Type functionType, String functionDefinition, String... acceptedOutputMimeTypes) {
|
|
|
|
|
this.target = target;
|
|
|
|
|
|
|
|
|
|
this.composed = !target.getClass().getName().contains("EnhancerBySpringCGLIB") && target.getClass().getDeclaredFields().length > 1;
|
|
|
|
|
this.functionType = functionType;
|
|
|
|
|
this.acceptedOutputMimeTypes = acceptedOutputMimeTypes;
|
|
|
|
|
this.functionDefinition = functionDefinition;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void accept(Object input) {
|
|
|
|
|
this.doApply(input, true);
|
|
|
|
|
@@ -325,157 +326,215 @@ public class BeanFactoryAwareFunctionRegistry
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Object get() {
|
|
|
|
|
// wrap/unwrap to/from reactive
|
|
|
|
|
Object input = Mono.class.isAssignableFrom(this.functionRegistration.getType().getInputWrapper())
|
|
|
|
|
Object input = FunctionTypeUtils.isMono(functionType)
|
|
|
|
|
? Mono.empty()
|
|
|
|
|
: (Flux.class.isAssignableFrom(this.functionRegistration.getType().getInputWrapper()) ? Flux.empty()
|
|
|
|
|
: null);
|
|
|
|
|
: (FunctionTypeUtils.isMono(functionType) ? Flux.empty() : null);
|
|
|
|
|
|
|
|
|
|
return this.doApply(input, false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public Object getTarget() {
|
|
|
|
|
return this.target;
|
|
|
|
|
public boolean isConsumer() {
|
|
|
|
|
return this.target instanceof Consumer;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
|
private Object doApply(Object input, boolean consumer) {
|
|
|
|
|
if (logger.isDebugEnabled()) {
|
|
|
|
|
logger.debug("Applying function: " + this.functionRegistration.getNames());
|
|
|
|
|
}
|
|
|
|
|
public boolean isFunction() {
|
|
|
|
|
return this.target instanceof Function;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (input != null) {
|
|
|
|
|
input = this.wrapInputToReactiveIfNecessary(input);
|
|
|
|
|
}
|
|
|
|
|
public boolean isSupplier() {
|
|
|
|
|
return this.target instanceof Supplier;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Object result;
|
|
|
|
|
if (input instanceof Publisher) {
|
|
|
|
|
if (input != null && !this.composed) {
|
|
|
|
|
input = this.functionTypeConversionHelper.convertInputIfNecessary(input);
|
|
|
|
|
}
|
|
|
|
|
result = this.applyReactive((Publisher<Object>) input, consumer);
|
|
|
|
|
public Object getTarget() {
|
|
|
|
|
return target;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// public boolean isMultipleOutput() {
|
|
|
|
|
//
|
|
|
|
|
// Type type = FunctionTypeUtils.getInputType(functionType, 0);
|
|
|
|
|
// return FunctionTypeUtils.isFlux(type);
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings({ "rawtypes", "unchecked" })
|
|
|
|
|
private Object invokeFunction(Object input) {
|
|
|
|
|
if (target instanceof FunctionInvocationWrapper || target instanceof Function) {
|
|
|
|
|
return ((Function) target).apply(input);
|
|
|
|
|
}
|
|
|
|
|
else if (target instanceof Supplier) {
|
|
|
|
|
return ((Supplier) target).get();
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
if (Publisher.class.isAssignableFrom(this.functionRegistration.getType().getInputWrapper())) {
|
|
|
|
|
throw new IllegalArgumentException("Invoking reactive function as imperative is not "
|
|
|
|
|
+ "allowed. Function name(s): " + this.functionRegistration.getNames());
|
|
|
|
|
((Consumer) target).accept(input);
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings({ "unchecked", "rawtypes" })
|
|
|
|
|
private Object doApply(Object input, boolean consumer) {
|
|
|
|
|
if (logger.isDebugEnabled()) {
|
|
|
|
|
logger.debug("Applying function: " + this.functionDefinition);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Object result = null;
|
|
|
|
|
if (input instanceof Publisher) {
|
|
|
|
|
input = this.composed ? input :
|
|
|
|
|
this.convertInputPublisherIfNecessary((Publisher<?>) input, FunctionTypeUtils.getInputType(functionType, 0));
|
|
|
|
|
if (FunctionTypeUtils.isReactive(FunctionTypeUtils.getInputType(functionType, 0))) {
|
|
|
|
|
result = this.invokeFunction(input);
|
|
|
|
|
if (result == null) {
|
|
|
|
|
result = Mono.empty();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
if (input != null && !this.composed) {
|
|
|
|
|
input = this.functionTypeConversionHelper.convertInputIfNecessary(input);
|
|
|
|
|
if (this.composed) {
|
|
|
|
|
return input instanceof Mono
|
|
|
|
|
? Mono.from((Publisher<?>) input).transform((Function) target)
|
|
|
|
|
: Flux.from((Publisher<?>) input).transform((Function) target);
|
|
|
|
|
}
|
|
|
|
|
result = this.applyImperative(input, consumer);
|
|
|
|
|
else {
|
|
|
|
|
boolean isConsumer = FunctionTypeUtils.isConsumer(functionType);
|
|
|
|
|
Publisher res;
|
|
|
|
|
if (isConsumer) {
|
|
|
|
|
res = input instanceof Mono ? Mono.from((Publisher) input).doOnNext((Consumer) this.target).then()
|
|
|
|
|
: Flux.from((Publisher) input).doOnNext((Consumer) this.target).then();
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
res = input instanceof Mono
|
|
|
|
|
? Mono.from((Publisher) input).map(value -> this.invokeFunction(value))
|
|
|
|
|
: Flux.from((Publisher) input).map(value -> this.invokeFunction(value));
|
|
|
|
|
}
|
|
|
|
|
return res;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
Type type = FunctionTypeUtils.getInputType(functionType, 0);
|
|
|
|
|
if (!composed && !FunctionTypeUtils.isMultipleInputArguments(functionType) && FunctionTypeUtils.isReactive(type)) {
|
|
|
|
|
Publisher<?> publisher = FunctionTypeUtils.isFlux(type)
|
|
|
|
|
? input == null ? Flux.empty() : Flux.just(input)
|
|
|
|
|
: input == null ? Mono.empty() : Mono.just(input);
|
|
|
|
|
publisher = this.convertInputPublisherIfNecessary(publisher, FunctionTypeUtils.getInputType(functionType, 0));
|
|
|
|
|
result = this.invokeFunction(publisher);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
result = this.invokeFunction(this.composed ? input
|
|
|
|
|
: this.convertInputValueIfNecessary(input, FunctionTypeUtils.getInputType(functionType, 0)));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result = this.functionTypeConversionHelper.convertOutputIfNecessary(result, this.acceptedOutputTypes);
|
|
|
|
|
if (!ObjectUtils.isEmpty(acceptedOutputMimeTypes)) {
|
|
|
|
|
if (result instanceof Publisher) {
|
|
|
|
|
result = this.convertOutputPublisherIfNecessary((Publisher<?>) result, this.acceptedOutputMimeTypes);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
result = this.convertOutputValueIfNecessary(result, this.acceptedOutputMimeTypes);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!(result instanceof Publisher) && this.functionRegistration.getTarget() instanceof Supplier) {
|
|
|
|
|
if (!(result instanceof Publisher) && (!(target instanceof FunctionInvocationWrapper) && target instanceof Supplier)) {
|
|
|
|
|
/*
|
|
|
|
|
* This is ONLY relevant for web, so consider exposing some property or may be
|
|
|
|
|
* the fact that this is a rare case (Supplier) leave it temporarily as is.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
return Flux.just(this.wrapOutputToReactiveIfNecessary(result));
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
return this.wrapOutputToReactiveIfNecessary(result);
|
|
|
|
|
return Flux.just(result);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings({ "unchecked", "rawtypes" })
|
|
|
|
|
private Object wrapOutputToReactiveIfNecessary(Object result) {
|
|
|
|
|
if (Flux.class.isAssignableFrom(this.functionRegistration.getType().getOutputWrapper())) {
|
|
|
|
|
result = result instanceof Publisher ? Flux.from((Publisher) result) : Flux.just(result);
|
|
|
|
|
}
|
|
|
|
|
else if (Mono.class.isAssignableFrom(this.functionRegistration.getType().getOutputWrapper())) {
|
|
|
|
|
result = result instanceof Publisher ? Mono.from((Publisher) result) : Mono.just(result);
|
|
|
|
|
}
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* For functions of type `Function<?, Publisher<?>>` the input will be converted
|
|
|
|
|
* to Publisher as well resulting in `Function<Publisher<?>, Publisher<?>>`
|
|
|
|
|
*/
|
|
|
|
|
private Object wrapInputToReactiveIfNecessary(Object input) {
|
|
|
|
|
if (input != null && !(input instanceof Publisher)) { // for Function<Object, Publisher>
|
|
|
|
|
if (Flux.class.isAssignableFrom(this.functionRegistration.getType().getInputWrapper())) {
|
|
|
|
|
input = Flux.just(input);
|
|
|
|
|
}
|
|
|
|
|
else if (Mono.class.isAssignableFrom(this.functionRegistration.getType().getInputWrapper())) {
|
|
|
|
|
input = Mono.just(input);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return input;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings({ "unchecked", "rawtypes" })
|
|
|
|
|
private Object applyImperative(Object input, boolean consumer) {
|
|
|
|
|
Object result = null;
|
|
|
|
|
if (this.target instanceof Function) {
|
|
|
|
|
if (Flux.class.isAssignableFrom(this.functionRegistration.getType().getInputWrapper())) {
|
|
|
|
|
result = ((Function) this.target).apply(Flux.just(input));
|
|
|
|
|
// we may need to convert output as well
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
result = ((Function) this.target).apply(input);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else if (this.target instanceof Consumer) {
|
|
|
|
|
((Consumer) this.target).accept(input);
|
|
|
|
|
}
|
|
|
|
|
else if (this.target instanceof Supplier) {
|
|
|
|
|
result = ((Supplier) this.target).get();
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
throw new UnsupportedOperationException(
|
|
|
|
|
"Target of type " + this.target.getClass() + " is not supported");
|
|
|
|
|
}
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings({ "unchecked", "rawtypes" })
|
|
|
|
|
private Object applyReactive(Publisher<Object> publisher, boolean consumer) {
|
|
|
|
|
Object result;
|
|
|
|
|
if (this.target instanceof Function) {
|
|
|
|
|
if (Publisher.class.isAssignableFrom(this.functionRegistration.getType().getInputWrapper())) {
|
|
|
|
|
result = ((Function) this.target).apply(publisher);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
if (Void.class.isAssignableFrom(this.functionRegistration.getType().getInputType()) && !functionRegistration.getType().isMessage()) {
|
|
|
|
|
result = ((Function) this.target).apply(null);
|
|
|
|
|
result = publisher instanceof Mono ? Mono.just(result) : Flux.just(result);
|
|
|
|
|
private Object convertOutputValueIfNecessary(Object value, String... acceptedOutputMimeTypes) {
|
|
|
|
|
logger.info("Converting output value ");
|
|
|
|
|
Object convertedValue = null;
|
|
|
|
|
if (FunctionTypeUtils.isMultipleArgumentsHolder(value)) {
|
|
|
|
|
int outputCount = FunctionTypeUtils.getOutputCount(functionType);
|
|
|
|
|
Object[] convertedInputArray = new Object[outputCount];
|
|
|
|
|
for (int i = 0; i < outputCount; i++) {
|
|
|
|
|
Expression parsed = new SpelExpressionParser().parseExpression("getT" + (i + 1) + "()");
|
|
|
|
|
Object outputArgument = parsed.getValue(value);
|
|
|
|
|
if (outputArgument instanceof Publisher) {
|
|
|
|
|
outputArgument = this.convertOutputPublisherIfNecessary((Publisher<?>) outputArgument, acceptedOutputMimeTypes[i]);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
result = publisher instanceof Mono
|
|
|
|
|
? Mono.from(publisher).map(value -> ((Function) this.target).apply(value))
|
|
|
|
|
: Flux.from(publisher).map(value -> ((Function) this.target).apply(value));
|
|
|
|
|
outputArgument = this.convertOutputValueIfNecessary(outputArgument, acceptedOutputMimeTypes);
|
|
|
|
|
}
|
|
|
|
|
convertedInputArray[i] = outputArgument;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else if (this.target instanceof Consumer) {
|
|
|
|
|
if (Publisher.class.isAssignableFrom(this.functionRegistration.getType().getInputWrapper())) {
|
|
|
|
|
((Consumer<Publisher<?>>) this.target).accept(publisher);
|
|
|
|
|
result = null;
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
result = publisher instanceof Flux ? Flux.from(publisher).doOnNext((Consumer) this.target).then()
|
|
|
|
|
: Mono.from(publisher).doOnNext((Consumer) this.target).then();
|
|
|
|
|
if (consumer) {
|
|
|
|
|
((Mono<?>) result).subscribe();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else if (this.target instanceof Supplier) {
|
|
|
|
|
result = ((Supplier<?>) this.target).get();
|
|
|
|
|
convertedValue = Tuples.fromArray(convertedInputArray);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
throw new UnsupportedOperationException(
|
|
|
|
|
"Target of type " + this.target.getClass() + " is not supported");
|
|
|
|
|
List<MimeType> acceptedContentTypes = MimeTypeUtils.parseMimeTypes(acceptedOutputMimeTypes[0].toString());
|
|
|
|
|
for (MimeType acceptedContentType : acceptedContentTypes) {
|
|
|
|
|
try {
|
|
|
|
|
MessageHeaders headers = new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, acceptedContentType));
|
|
|
|
|
convertedValue = messageConverter.toMessage(value, headers);
|
|
|
|
|
if (convertedValue != null) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (Exception e) {
|
|
|
|
|
// ignore
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
return convertedValue;
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Publisher<?> convertOutputPublisherIfNecessary(Publisher<?> publisher, String... acceptedOutputMimeTypes) {
|
|
|
|
|
System.out.println("Converting output publisher");
|
|
|
|
|
Publisher<?> result = publisher instanceof Mono
|
|
|
|
|
? Mono.from(publisher).map(value -> this.convertOutputValueIfNecessary(value, acceptedOutputMimeTypes))
|
|
|
|
|
: Flux.from(publisher).map(value -> this.convertOutputValueIfNecessary(value, acceptedOutputMimeTypes));
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Publisher<?> convertInputPublisherIfNecessary(Publisher<?> publisher, Type type) {
|
|
|
|
|
System.out.println("Converting publisher");
|
|
|
|
|
Publisher<?> result = publisher instanceof Mono
|
|
|
|
|
? Mono.from(publisher).map(value -> this.convertInputValueIfNecessary(value, type))
|
|
|
|
|
: Flux.from(publisher).map(value -> this.convertInputValueIfNecessary(value, type));
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Object convertInputValueIfNecessary(Object value, Type type) {
|
|
|
|
|
System.out.println("Converting value");
|
|
|
|
|
Object convertedValue = value;
|
|
|
|
|
if (FunctionTypeUtils.isMultipleArgumentsHolder(value)) {
|
|
|
|
|
int inputCount = FunctionTypeUtils.getInputCount(functionType);
|
|
|
|
|
Object[] convertedInputArray = new Object[inputCount];
|
|
|
|
|
for (int i = 0; i < inputCount; i++) {
|
|
|
|
|
Expression parsed = new SpelExpressionParser().parseExpression("getT" + (i + 1) + "()");
|
|
|
|
|
Object inptArgument = parsed.getValue(value);
|
|
|
|
|
inptArgument = inptArgument instanceof Publisher
|
|
|
|
|
? this.convertInputPublisherIfNecessary((Publisher<?>) inptArgument, FunctionTypeUtils.getInputType(functionType, i))
|
|
|
|
|
: this.convertInputValueIfNecessary(inptArgument, FunctionTypeUtils.getInputType(functionType, i));
|
|
|
|
|
convertedInputArray[i] = inptArgument;
|
|
|
|
|
}
|
|
|
|
|
convertedValue = Tuples.fromArray(convertedInputArray);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
// this needs revisiting as the type is not always Class (think really complex types)
|
|
|
|
|
Type rawType = FunctionTypeUtils.unwrapActualTypeByIndex(type, 0);
|
|
|
|
|
if (!(rawType instanceof Class<?>) && rawType instanceof ParameterizedType) {
|
|
|
|
|
rawType = ((ParameterizedType) rawType).getRawType();
|
|
|
|
|
}
|
|
|
|
|
if (value instanceof Message<?>) { // see AWS adapter with Optional payload
|
|
|
|
|
if (!(((Message<?>) value).getPayload() instanceof Optional)) {
|
|
|
|
|
convertedValue = messageConverter.fromMessage((Message<?>) value, (Class<?>) rawType, type);
|
|
|
|
|
if (FunctionTypeUtils.isMessage(type)) {
|
|
|
|
|
convertedValue = MessageBuilder.withPayload(convertedValue).copyHeaders(((Message<?>) value).getHeaders()).build();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
if (rawType instanceof Class<?>) { // see AWS adapter with WildardTypeImpl and Azure with Voids
|
|
|
|
|
convertedValue = conversionService.convert(value, (Class<?>) rawType);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return convertedValue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|