|
|
|
|
@@ -31,7 +31,6 @@ import java.util.Map;
|
|
|
|
|
import java.util.Optional;
|
|
|
|
|
import java.util.Set;
|
|
|
|
|
import java.util.TreeSet;
|
|
|
|
|
import java.util.function.BiFunction;
|
|
|
|
|
import java.util.function.Consumer;
|
|
|
|
|
import java.util.function.Function;
|
|
|
|
|
import java.util.function.Supplier;
|
|
|
|
|
@@ -53,6 +52,7 @@ import org.springframework.cloud.function.context.FunctionProperties;
|
|
|
|
|
import org.springframework.cloud.function.context.FunctionRegistration;
|
|
|
|
|
import org.springframework.cloud.function.context.FunctionRegistry;
|
|
|
|
|
import org.springframework.cloud.function.context.config.RoutingFunction;
|
|
|
|
|
import org.springframework.cloud.function.context.message.OutputMessageHeaderEnricher;
|
|
|
|
|
import org.springframework.cloud.function.json.JsonMapper;
|
|
|
|
|
import org.springframework.core.ResolvableType;
|
|
|
|
|
import org.springframework.core.convert.ConversionService;
|
|
|
|
|
@@ -69,20 +69,21 @@ import org.springframework.util.ObjectUtils;
|
|
|
|
|
import org.springframework.util.ReflectionUtils;
|
|
|
|
|
import org.springframework.util.StringUtils;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Implementation of {@link FunctionCatalog} and {@link FunctionRegistry} which
|
|
|
|
|
* does not depend on Spring's {@link BeanFactory}.
|
|
|
|
|
* Each function must be registered with it explicitly to benefit from features
|
|
|
|
|
* such as type conversion, composition, POJO etc.
|
|
|
|
|
* Implementation of {@link FunctionCatalog} and {@link FunctionRegistry} which does not
|
|
|
|
|
* depend on Spring's {@link BeanFactory}. Each function must be registered with it
|
|
|
|
|
* explicitly to benefit from features such as type conversion, composition, POJO etc.
|
|
|
|
|
*
|
|
|
|
|
* @author Oleg Zhurakousky
|
|
|
|
|
*
|
|
|
|
|
*/
|
|
|
|
|
public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspector {
|
|
|
|
|
|
|
|
|
|
protected Log logger = LogFactory.getLog(this.getClass());
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* - do we care about FunctionRegistration after it's been registered? What additional value does it bring?
|
|
|
|
|
* - do we care about FunctionRegistration after it's been registered? What additional
|
|
|
|
|
* value does it bring?
|
|
|
|
|
*
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
@@ -101,7 +102,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
@Autowired(required = false)
|
|
|
|
|
private FunctionAroundWrapper functionAroundWrapper;
|
|
|
|
|
|
|
|
|
|
public SimpleFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter, JsonMapper jsonMapper) {
|
|
|
|
|
public SimpleFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter,
|
|
|
|
|
JsonMapper jsonMapper) {
|
|
|
|
|
Assert.notNull(messageConverter, "'messageConverter' must not be null");
|
|
|
|
|
Assert.notNull(jsonMapper, "'jsonMapper' must not be null");
|
|
|
|
|
this.conversionService = conversionService;
|
|
|
|
|
@@ -143,7 +145,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
this.functionRegistrations.add(registration);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//-----
|
|
|
|
|
// -----
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Set<String> getNames(Class<?> type) {
|
|
|
|
|
@@ -173,7 +175,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
function = this.compose(type, functionDefinition);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (function != null && !ObjectUtils.isEmpty(expectedOutputMimeTypes)) {
|
|
|
|
|
if (function != null && !ObjectUtils.isEmpty(expectedOutputMimeTypes)) {
|
|
|
|
|
function.expectedOutputContentType = expectedOutputMimeTypes;
|
|
|
|
|
}
|
|
|
|
|
else if (logger.isDebugEnabled()) {
|
|
|
|
|
@@ -186,23 +188,19 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* This method will make sure that if there is only one function in catalog
|
|
|
|
|
* it can be looked up by any name or no name.
|
|
|
|
|
* It does so by attempting to determine the default function name
|
|
|
|
|
* (the only function in catalog) and checking if it matches the provided name
|
|
|
|
|
* replacing it if it does not.
|
|
|
|
|
* This method will make sure that if there is only one function in catalog it can be
|
|
|
|
|
* looked up by any name or no name. It does so by attempting to determine the default
|
|
|
|
|
* function name (the only function in catalog) and checking if it matches the
|
|
|
|
|
* provided name replacing it if it does not.
|
|
|
|
|
*/
|
|
|
|
|
String normalizeFunctionDefinition(String functionDefinition) {
|
|
|
|
|
functionDefinition = StringUtils.hasText(functionDefinition)
|
|
|
|
|
? functionDefinition.replaceAll(",", "|")
|
|
|
|
|
functionDefinition = StringUtils.hasText(functionDefinition) ? functionDefinition.replaceAll(",", "|")
|
|
|
|
|
: System.getProperty(FunctionProperties.FUNCTION_DEFINITION, "");
|
|
|
|
|
|
|
|
|
|
if (!this.getNames(null).contains(functionDefinition)) {
|
|
|
|
|
List<String> eligibleFunction = this.getNames(null).stream()
|
|
|
|
|
.filter(name -> !RoutingFunction.FUNCTION_NAME.equals(name))
|
|
|
|
|
.collect(Collectors.toList());
|
|
|
|
|
if (eligibleFunction.size() == 1
|
|
|
|
|
&& !eligibleFunction.get(0).equals(functionDefinition)
|
|
|
|
|
.filter(name -> !RoutingFunction.FUNCTION_NAME.equals(name)).collect(Collectors.toList());
|
|
|
|
|
if (eligibleFunction.size() == 1 && !eligibleFunction.get(0).equals(functionDefinition)
|
|
|
|
|
&& !functionDefinition.contains("|")) {
|
|
|
|
|
functionDefinition = eligibleFunction.get(0);
|
|
|
|
|
}
|
|
|
|
|
@@ -211,9 +209,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* This is primarily to support spring-cloud-sleauth.
|
|
|
|
|
* There is no current use cases in functions where it is used.
|
|
|
|
|
* The approach may change in the future.
|
|
|
|
|
* This is primarily to support spring-cloud-sleauth. There is no current use cases in
|
|
|
|
|
* functions where it is used. The approach may change in the future.
|
|
|
|
|
*/
|
|
|
|
|
private FunctionInvocationWrapper wrapInAroundAviceIfNecessary(FunctionInvocationWrapper function) {
|
|
|
|
|
FunctionInvocationWrapper wrappedFunction = function;
|
|
|
|
|
@@ -234,12 +231,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
*/
|
|
|
|
|
private FunctionInvocationWrapper findFunctionInFunctionRegistrations(String functionName) {
|
|
|
|
|
FunctionRegistration<?> functionRegistration = this.functionRegistrations.stream()
|
|
|
|
|
.filter(fr -> fr.getNames().contains(functionName))
|
|
|
|
|
.findFirst()
|
|
|
|
|
.orElseGet(() -> null);
|
|
|
|
|
return functionRegistration != null
|
|
|
|
|
? this.invocationWrapperInstance(functionName, functionRegistration.getTarget(), functionRegistration.getType().getType())
|
|
|
|
|
: null;
|
|
|
|
|
.filter(fr -> fr.getNames().contains(functionName)).findFirst().orElseGet(() -> null);
|
|
|
|
|
return functionRegistration != null ? this.invocationWrapperInstance(functionName,
|
|
|
|
|
functionRegistration.getTarget(), functionRegistration.getType().getType()) : null;
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -247,7 +241,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
*
|
|
|
|
|
*/
|
|
|
|
|
private FunctionInvocationWrapper compose(Class<?> type, String functionDefinition) {
|
|
|
|
|
String[] functionNames = StringUtils.delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), "|");
|
|
|
|
|
String[] functionNames = StringUtils.delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(),
|
|
|
|
|
"|");
|
|
|
|
|
FunctionInvocationWrapper composedFunction = null;
|
|
|
|
|
|
|
|
|
|
for (String functionName : functionNames) {
|
|
|
|
|
@@ -260,9 +255,10 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
composedFunction = function;
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
FunctionInvocationWrapper andThenFunction =
|
|
|
|
|
invocationWrapperInstance(functionName, function.getTarget(), function.inputType, function.outputType);
|
|
|
|
|
composedFunction = (FunctionInvocationWrapper) composedFunction.andThen((Function<Object, Object>) andThenFunction);
|
|
|
|
|
FunctionInvocationWrapper andThenFunction = invocationWrapperInstance(functionName,
|
|
|
|
|
function.getTarget(), function.inputType, function.outputType);
|
|
|
|
|
composedFunction = (FunctionInvocationWrapper) composedFunction
|
|
|
|
|
.andThen((Function<Object, Object>) andThenFunction);
|
|
|
|
|
}
|
|
|
|
|
this.wrappedFunctionDefinitions.put(composedFunction.functionDefinition, composedFunction);
|
|
|
|
|
}
|
|
|
|
|
@@ -276,14 +272,16 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
/*
|
|
|
|
|
*
|
|
|
|
|
*/
|
|
|
|
|
private FunctionInvocationWrapper invocationWrapperInstance(String functionDefinition, Object target, Type inputType, Type outputType) {
|
|
|
|
|
private FunctionInvocationWrapper invocationWrapperInstance(String functionDefinition, Object target,
|
|
|
|
|
Type inputType, Type outputType) {
|
|
|
|
|
return new FunctionInvocationWrapper(functionDefinition, target, inputType, outputType);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
*
|
|
|
|
|
*/
|
|
|
|
|
private FunctionInvocationWrapper invocationWrapperInstance(String functionDefinition, Object target, Type functionType) {
|
|
|
|
|
private FunctionInvocationWrapper invocationWrapperInstance(String functionDefinition, Object target,
|
|
|
|
|
Type functionType) {
|
|
|
|
|
return invocationWrapperInstance(functionDefinition, target,
|
|
|
|
|
FunctionTypeUtils.isSupplier(functionType) ? null : FunctionTypeUtils.getInputType(functionType),
|
|
|
|
|
FunctionTypeUtils.getOutputType(functionType));
|
|
|
|
|
@@ -293,7 +291,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
*
|
|
|
|
|
*/
|
|
|
|
|
@SuppressWarnings("rawtypes")
|
|
|
|
|
public class FunctionInvocationWrapper implements Function<Object, Object>, Consumer<Object>, Supplier<Object>, Runnable {
|
|
|
|
|
public class FunctionInvocationWrapper
|
|
|
|
|
implements Function<Object, Object>, Consumer<Object>, Supplier<Object>, Runnable {
|
|
|
|
|
|
|
|
|
|
private final Object target;
|
|
|
|
|
|
|
|
|
|
@@ -314,17 +313,17 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
private boolean skipOutputConversion;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* This is primarily to support Stream's ability to access
|
|
|
|
|
* un-converted payload (e.g., to evaluate expression on some attribute of a payload)
|
|
|
|
|
* It does not have a setter/getter and can only be set via reflection.
|
|
|
|
|
* It is not intended to remain here and will be removed as soon as particular elements
|
|
|
|
|
* of stream will be refactored to address this.
|
|
|
|
|
* This is primarily to support Stream's ability to access un-converted payload
|
|
|
|
|
* (e.g., to evaluate expression on some attribute of a payload) It does not have
|
|
|
|
|
* a setter/getter and can only be set via reflection. It is not intended to
|
|
|
|
|
* remain here and will be removed as soon as particular elements of stream will
|
|
|
|
|
* be refactored to address this.
|
|
|
|
|
*/
|
|
|
|
|
private Function<Object, Message> enhancer;
|
|
|
|
|
|
|
|
|
|
private BiFunction<Message<?>, Object, Message<?>> outputMessageHeaderEnricher;
|
|
|
|
|
private OutputMessageHeaderEnricher outputMessageHeaderEnricher;
|
|
|
|
|
|
|
|
|
|
void setOutputMessageHeaderEnricher(BiFunction<Message<?>, Object, Message<?>> outputMessageHeaderEnricher) {
|
|
|
|
|
void setOutputMessageHeaderEnricher(OutputMessageHeaderEnricher outputMessageHeaderEnricher) {
|
|
|
|
|
this.outputMessageHeaderEnricher = outputMessageHeaderEnricher;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -336,7 +335,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
this.message = this.inputType != null && FunctionTypeUtils.isMessage(this.inputType);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
FunctionInvocationWrapper(String functionDefinition, Object target, Type inputType, Type outputType) {
|
|
|
|
|
FunctionInvocationWrapper(String functionDefinition, Object target, Type inputType, Type outputType) {
|
|
|
|
|
this.target = target;
|
|
|
|
|
this.inputType = this.normalizeType(inputType);
|
|
|
|
|
this.outputType = this.normalizeType(outputType);
|
|
|
|
|
@@ -346,14 +345,16 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
|
|
|
|
|
public void setSkipInputConversion(boolean skipInputConversion) {
|
|
|
|
|
if (logger.isDebugEnabled() && skipInputConversion) {
|
|
|
|
|
logger.debug("'skipInputConversion' was explicitely set to true. No input conversion will be attempted");
|
|
|
|
|
logger.debug(
|
|
|
|
|
"'skipInputConversion' was explicitely set to true. No input conversion will be attempted");
|
|
|
|
|
}
|
|
|
|
|
this.skipInputConversion = skipInputConversion;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void setSkipOutputConversion(boolean skipOutputConversion) {
|
|
|
|
|
if (logger.isDebugEnabled() && skipOutputConversion) {
|
|
|
|
|
logger.debug("'skipOutputConversion' was explicitely set to true. No output conversion will be attempted");
|
|
|
|
|
logger.debug(
|
|
|
|
|
"'skipOutputConversion' was explicitely set to true. No output conversion will be attempted");
|
|
|
|
|
}
|
|
|
|
|
this.skipOutputConversion = skipOutputConversion;
|
|
|
|
|
}
|
|
|
|
|
@@ -371,23 +372,27 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Return the actual {@link Type} of the item of the provided type.
|
|
|
|
|
* This method is context specific and is not a general purpose utility method. The context is that the provided
|
|
|
|
|
* {@link Type} may represent the input/output of a function where such type could be wrapped in
|
|
|
|
|
* {@link Message}, {@link Flux} or {@link Mono}, so this method returns generic value of such type or itself if not wrapped.
|
|
|
|
|
* @param type typically input or output Type of the function (see {@link #getInputType()} or {@link #getOutputType()}.
|
|
|
|
|
* Return the actual {@link Type} of the item of the provided type. This method is
|
|
|
|
|
* context specific and is not a general purpose utility method. The context is
|
|
|
|
|
* that the provided {@link Type} may represent the input/output of a function
|
|
|
|
|
* where such type could be wrapped in {@link Message}, {@link Flux} or
|
|
|
|
|
* {@link Mono}, so this method returns generic value of such type or itself if
|
|
|
|
|
* not wrapped.
|
|
|
|
|
* @param type typically input or output Type of the function (see
|
|
|
|
|
* {@link #getInputType()} or {@link #getOutputType()}.
|
|
|
|
|
* @return the type of the item if wrapped otherwise the provided type.
|
|
|
|
|
*/
|
|
|
|
|
public Type getItemType(Type type) {
|
|
|
|
|
if (FunctionTypeUtils.isPublisher(type) || FunctionTypeUtils.isMessage(type) || FunctionTypeUtils.isTypeCollection(type)) {
|
|
|
|
|
if (FunctionTypeUtils.isPublisher(type) || FunctionTypeUtils.isMessage(type)
|
|
|
|
|
|| FunctionTypeUtils.isTypeCollection(type)) {
|
|
|
|
|
type = FunctionTypeUtils.getGenericType(type);
|
|
|
|
|
}
|
|
|
|
|
return type;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Use individual {@link #getInputType()}, {@link #getOutputType()} and their variants as well as
|
|
|
|
|
* other supporting operations instead.
|
|
|
|
|
* Use individual {@link #getInputType()}, {@link #getOutputType()} and their
|
|
|
|
|
* variants as well as other supporting operations instead.
|
|
|
|
|
* @deprecated since 3.1
|
|
|
|
|
*/
|
|
|
|
|
@Deprecated
|
|
|
|
|
@@ -420,7 +425,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
*/
|
|
|
|
|
@Override
|
|
|
|
|
public Object apply(Object input) {
|
|
|
|
|
if (logger.isDebugEnabled() && !(input instanceof Publisher)) {
|
|
|
|
|
if (logger.isDebugEnabled() && !(input instanceof Publisher)) {
|
|
|
|
|
logger.debug("Invoking function " + this);
|
|
|
|
|
}
|
|
|
|
|
Object result = this.doApply(input);
|
|
|
|
|
@@ -476,7 +481,6 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
return FunctionTypeUtils.isMessage(this.outputType);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public boolean isRoutingFunction() {
|
|
|
|
|
return this.target instanceof RoutingFunction;
|
|
|
|
|
}
|
|
|
|
|
@@ -487,12 +491,14 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
|
@Override
|
|
|
|
|
public <V> Function<Object, V> andThen(Function<? super Object, ? extends V> after) {
|
|
|
|
|
Assert.isTrue(after instanceof FunctionInvocationWrapper, "Composed function must be an instanceof FunctionInvocationWrapper.");
|
|
|
|
|
Assert.isTrue(after instanceof FunctionInvocationWrapper,
|
|
|
|
|
"Composed function must be an instanceof FunctionInvocationWrapper.");
|
|
|
|
|
if (FunctionTypeUtils.isMultipleArgumentType(this.inputType)
|
|
|
|
|
|| FunctionTypeUtils.isMultipleArgumentType(this.outputType)
|
|
|
|
|
|| FunctionTypeUtils.isMultipleArgumentType(((FunctionInvocationWrapper) after).inputType)
|
|
|
|
|
|| FunctionTypeUtils.isMultipleArgumentType(((FunctionInvocationWrapper) after).outputType)) {
|
|
|
|
|
throw new UnsupportedOperationException("Composition of functions with multiple arguments is not supported at the moment");
|
|
|
|
|
throw new UnsupportedOperationException(
|
|
|
|
|
"Composition of functions with multiple arguments is not supported at the moment");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Function rawComposedFunction = v -> ((FunctionInvocationWrapper) after).doApply(doApply(v));
|
|
|
|
|
@@ -501,35 +507,39 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
|
|
|
|
|
Type composedFunctionType;
|
|
|
|
|
if (afterWrapper.outputType == null) {
|
|
|
|
|
composedFunctionType = ResolvableType.forClassWithGenerics(Consumer.class, this.inputType == null
|
|
|
|
|
? null
|
|
|
|
|
: ResolvableType.forType(this.inputType)).getType();
|
|
|
|
|
composedFunctionType = ResolvableType.forClassWithGenerics(Consumer.class,
|
|
|
|
|
this.inputType == null ? null : ResolvableType.forType(this.inputType)).getType();
|
|
|
|
|
}
|
|
|
|
|
else if (this.inputType == null && afterWrapper.outputType != null) {
|
|
|
|
|
ResolvableType composedOutputType;
|
|
|
|
|
if (FunctionTypeUtils.isFlux(this.outputType)) {
|
|
|
|
|
composedOutputType = ResolvableType.forClassWithGenerics(Flux.class, ResolvableType.forType(afterWrapper.outputType));
|
|
|
|
|
composedOutputType = ResolvableType.forClassWithGenerics(Flux.class,
|
|
|
|
|
ResolvableType.forType(afterWrapper.outputType));
|
|
|
|
|
}
|
|
|
|
|
else if (FunctionTypeUtils.isMono(this.outputType)) {
|
|
|
|
|
composedOutputType = ResolvableType.forClassWithGenerics(Mono.class, ResolvableType.forType(afterWrapper.outputType));
|
|
|
|
|
composedOutputType = ResolvableType.forClassWithGenerics(Mono.class,
|
|
|
|
|
ResolvableType.forType(afterWrapper.outputType));
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
composedOutputType = ResolvableType.forType(afterWrapper.outputType);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
composedFunctionType = ResolvableType.forClassWithGenerics(Supplier.class, composedOutputType).getType();
|
|
|
|
|
composedFunctionType = ResolvableType.forClassWithGenerics(Supplier.class, composedOutputType)
|
|
|
|
|
.getType();
|
|
|
|
|
}
|
|
|
|
|
else if (this.outputType == null) {
|
|
|
|
|
throw new IllegalArgumentException("Can NOT compose anything with Consumer");
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
composedFunctionType = ResolvableType.forClassWithGenerics(Function.class,
|
|
|
|
|
ResolvableType.forType(this.inputType),
|
|
|
|
|
ResolvableType.forType(((FunctionInvocationWrapper) after).outputType)).getType();
|
|
|
|
|
composedFunctionType = ResolvableType
|
|
|
|
|
.forClassWithGenerics(Function.class, ResolvableType.forType(this.inputType),
|
|
|
|
|
ResolvableType.forType(((FunctionInvocationWrapper) after).outputType))
|
|
|
|
|
.getType();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String composedName = this.functionDefinition + "|" + afterWrapper.functionDefinition;
|
|
|
|
|
FunctionInvocationWrapper composedFunction = invocationWrapperInstance(composedName, rawComposedFunction, composedFunctionType);
|
|
|
|
|
FunctionInvocationWrapper composedFunction = invocationWrapperInstance(composedName, rawComposedFunction,
|
|
|
|
|
composedFunctionType);
|
|
|
|
|
composedFunction.composed = true;
|
|
|
|
|
|
|
|
|
|
return (Function<Object, V>) composedFunction;
|
|
|
|
|
@@ -548,12 +558,14 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
*/
|
|
|
|
|
@Override
|
|
|
|
|
public String toString() {
|
|
|
|
|
return this.functionDefinition + (this.isComposed() ? "" : "<" + this.inputType + ", " + this.outputType + ">");
|
|
|
|
|
return this.functionDefinition
|
|
|
|
|
+ (this.isComposed() ? "" : "<" + this.inputType + ", " + this.outputType + ">");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns true if this function wrapper represents a composed function.
|
|
|
|
|
* @return true if this function wrapper represents a composed function otherwise false
|
|
|
|
|
* @return true if this function wrapper represents a composed function otherwise
|
|
|
|
|
* false
|
|
|
|
|
*/
|
|
|
|
|
boolean isComposed() {
|
|
|
|
|
return this.composed;
|
|
|
|
|
@@ -593,7 +605,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Will return Object.class if type is represented as TypeVariable(T) or WildcardType(?).
|
|
|
|
|
* Will return Object.class if type is represented as TypeVariable(T) or
|
|
|
|
|
* WildcardType(?).
|
|
|
|
|
*/
|
|
|
|
|
private Type normalizeType(Type type) {
|
|
|
|
|
if (type != null) {
|
|
|
|
|
@@ -606,13 +619,13 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
*
|
|
|
|
|
*/
|
|
|
|
|
private Class<?> getRawClassFor(@Nullable Type type) {
|
|
|
|
|
return type instanceof TypeVariable || type instanceof WildcardType
|
|
|
|
|
? Object.class
|
|
|
|
|
return type instanceof TypeVariable || type instanceof WildcardType ? Object.class
|
|
|
|
|
: FunctionTypeUtils.getRawType(type);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Will wrap the result in a Message if necessary and will copy input headers to the output message.
|
|
|
|
|
* Will wrap the result in a Message if necessary and will copy input headers to
|
|
|
|
|
* the output message.
|
|
|
|
|
*/
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
|
private Object enrichInvocationResultIfNecessary(Object input, Object result) {
|
|
|
|
|
@@ -620,14 +633,17 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
if (result instanceof Message) {
|
|
|
|
|
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
|
|
|
|
|
.getField(SimpleFunctionRegistry.this.headersField, ((Message) result).getHeaders());
|
|
|
|
|
this.sanitizeHeaders(((Message) input).getHeaders()).forEach((k, v) -> headersMap.putIfAbsent(k, v));
|
|
|
|
|
this.sanitizeHeaders(((Message) input).getHeaders())
|
|
|
|
|
.forEach((k, v) -> headersMap.putIfAbsent(k, v));
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
Message<Object> output = MessageBuilder.withPayload(result)
|
|
|
|
|
.copyHeaders(this.sanitizeHeaders(((Message) input).getHeaders())).build();
|
|
|
|
|
if (this.outputMessageHeaderEnricher != null) {
|
|
|
|
|
result = this.outputMessageHeaderEnricher.apply((Message<?>) input, result);
|
|
|
|
|
result = this.outputMessageHeaderEnricher.enrich(output);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
result = MessageBuilder.withPayload(result).copyHeaders(this.sanitizeHeaders(((Message) input).getHeaders())).build();
|
|
|
|
|
result = output;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -651,9 +667,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
*
|
|
|
|
|
*/
|
|
|
|
|
private Object fluxifyInputIfNecessary(Object input) {
|
|
|
|
|
if (!(input instanceof Publisher) && this.isTypePublisher(this.inputType) && !FunctionTypeUtils.isMultipleArgumentType(this.inputType)) {
|
|
|
|
|
return input == null
|
|
|
|
|
? FunctionTypeUtils.isMono(this.inputType) ? Mono.empty() : Flux.empty()
|
|
|
|
|
if (!(input instanceof Publisher) && this.isTypePublisher(this.inputType)
|
|
|
|
|
&& !FunctionTypeUtils.isMultipleArgumentType(this.inputType)) {
|
|
|
|
|
return input == null ? FunctionTypeUtils.isMono(this.inputType) ? Mono.empty() : Flux.empty()
|
|
|
|
|
: FunctionTypeUtils.isMono(this.inputType) ? Mono.just(input) : Flux.just(input);
|
|
|
|
|
}
|
|
|
|
|
return input;
|
|
|
|
|
@@ -667,20 +683,24 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
Object result;
|
|
|
|
|
if (!this.isTypePublisher(this.inputType) && convertedInput instanceof Publisher) {
|
|
|
|
|
result = convertedInput instanceof Mono
|
|
|
|
|
? Mono.from((Publisher) convertedInput).map(value -> this.invokeFunctionAndEnrichResultIfNecessary(value))
|
|
|
|
|
.doOnError(ex -> logger.error("Failed to invoke function '" + this.functionDefinition + "'", (Throwable) ex))
|
|
|
|
|
: Flux.from((Publisher) convertedInput).map(value -> this.invokeFunctionAndEnrichResultIfNecessary(value))
|
|
|
|
|
.doOnError(ex -> logger.error("Failed to invoke function '" + this.functionDefinition + "'", (Throwable) ex));
|
|
|
|
|
? Mono.from((Publisher) convertedInput)
|
|
|
|
|
.map(value -> this.invokeFunctionAndEnrichResultIfNecessary(value))
|
|
|
|
|
.doOnError(ex -> logger.error(
|
|
|
|
|
"Failed to invoke function '" + this.functionDefinition + "'", (Throwable) ex))
|
|
|
|
|
: Flux.from((Publisher) convertedInput)
|
|
|
|
|
.map(value -> this.invokeFunctionAndEnrichResultIfNecessary(value))
|
|
|
|
|
.doOnError(ex -> logger.error(
|
|
|
|
|
"Failed to invoke function '" + this.functionDefinition + "'", (Throwable) ex));
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
result = this.invokeFunctionAndEnrichResultIfNecessary(convertedInput);
|
|
|
|
|
if (result instanceof Flux) {
|
|
|
|
|
result = ((Flux) result).doOnError(ex -> logger.error("Failed to invoke function '"
|
|
|
|
|
+ this.functionDefinition + "'", (Throwable) ex));
|
|
|
|
|
result = ((Flux) result).doOnError(ex -> logger
|
|
|
|
|
.error("Failed to invoke function '" + this.functionDefinition + "'", (Throwable) ex));
|
|
|
|
|
}
|
|
|
|
|
else if (result instanceof Mono) {
|
|
|
|
|
result = ((Mono) result).doOnError(ex -> logger.error("Failed to invoke function '"
|
|
|
|
|
+ this.functionDefinition + "'", (Throwable) ex));
|
|
|
|
|
result = ((Mono) result).doOnError(ex -> logger
|
|
|
|
|
.error("Failed to invoke function '" + this.functionDefinition + "'", (Throwable) ex));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return result;
|
|
|
|
|
@@ -707,9 +727,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
}
|
|
|
|
|
Object result = ((Function) this.target).apply(inputValue);
|
|
|
|
|
|
|
|
|
|
return value instanceof OriginalMessageHolder
|
|
|
|
|
? this.enrichInvocationResultIfNecessary(((OriginalMessageHolder) value).getOriginalMessage(), result)
|
|
|
|
|
: result;
|
|
|
|
|
return value instanceof OriginalMessageHolder ? this.enrichInvocationResultIfNecessary(
|
|
|
|
|
((OriginalMessageHolder) value).getOriginalMessage(), result) : result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@@ -720,20 +739,20 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
Object result = null;
|
|
|
|
|
if (this.isTypePublisher(this.inputType)) {
|
|
|
|
|
if (convertedInput instanceof Flux) {
|
|
|
|
|
result = ((Flux) convertedInput)
|
|
|
|
|
.transform(flux -> {
|
|
|
|
|
flux = Flux.from((Publisher) flux).map(v -> this.extractValueFromOriginalValueHolderIfNecessary(v));
|
|
|
|
|
((Consumer) this.target).accept(flux);
|
|
|
|
|
return Mono.ignoreElements((Flux) flux);
|
|
|
|
|
}).then();
|
|
|
|
|
result = ((Flux) convertedInput).transform(flux -> {
|
|
|
|
|
flux = Flux.from((Publisher) flux)
|
|
|
|
|
.map(v -> this.extractValueFromOriginalValueHolderIfNecessary(v));
|
|
|
|
|
((Consumer) this.target).accept(flux);
|
|
|
|
|
return Mono.ignoreElements((Flux) flux);
|
|
|
|
|
}).then();
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
result = ((Mono) convertedInput)
|
|
|
|
|
.transform(mono -> {
|
|
|
|
|
mono = Mono.from((Publisher) mono).map(v -> this.extractValueFromOriginalValueHolderIfNecessary(v));
|
|
|
|
|
((Consumer) this.target).accept(mono);
|
|
|
|
|
return Mono.ignoreElements((Flux) mono);
|
|
|
|
|
}).then();
|
|
|
|
|
result = ((Mono) convertedInput).transform(mono -> {
|
|
|
|
|
mono = Mono.from((Publisher) mono)
|
|
|
|
|
.map(v -> this.extractValueFromOriginalValueHolderIfNecessary(v));
|
|
|
|
|
((Consumer) this.target).accept(mono);
|
|
|
|
|
return Mono.ignoreElements((Flux) mono);
|
|
|
|
|
}).then();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else if (convertedInput instanceof Publisher) {
|
|
|
|
|
@@ -771,12 +790,14 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
}
|
|
|
|
|
return parsedArgumentValues;
|
|
|
|
|
}
|
|
|
|
|
throw new UnsupportedOperationException("At the moment only Tuple-based function are supporting multiple arguments");
|
|
|
|
|
throw new UnsupportedOperationException(
|
|
|
|
|
"At the moment only Tuple-based function are supporting multiple arguments");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
|
private boolean isInputConversionNecessary(Object input, Type type) {
|
|
|
|
|
if (type == null || this.getRawClassFor(type) == Void.class || this.target instanceof RoutingFunction || this.isComposed()) {
|
|
|
|
|
if (type == null || this.getRawClassFor(type) == Void.class || this.target instanceof RoutingFunction
|
|
|
|
|
|| this.isComposed()) {
|
|
|
|
|
if (this.getRawClassFor(type) == Void.class) {
|
|
|
|
|
if (input instanceof Message) {
|
|
|
|
|
input = ((Message) input).getPayload();
|
|
|
|
|
@@ -790,6 +811,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
*
|
|
|
|
|
*/
|
|
|
|
|
@@ -813,13 +835,13 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
convertedInput = Tuples.fromArray(convertedInputs);
|
|
|
|
|
}
|
|
|
|
|
else if (this.skipInputConversion) {
|
|
|
|
|
convertedInput = this.isInputTypeMessage()
|
|
|
|
|
? input
|
|
|
|
|
convertedInput = this.isInputTypeMessage() ? input
|
|
|
|
|
: new OriginalMessageHolder(((Message) input).getPayload(), (Message<?>) input);
|
|
|
|
|
}
|
|
|
|
|
else if (input instanceof Message) {
|
|
|
|
|
if (((Message) input).getPayload().getClass().getName().equals("org.springframework.kafka.support.KafkaNull")
|
|
|
|
|
&& !this.isInputTypeMessage()) { //TODO rework
|
|
|
|
|
if (((Message) input).getPayload().getClass().getName()
|
|
|
|
|
.equals("org.springframework.kafka.support.KafkaNull") && !this.isInputTypeMessage()) { // TODO
|
|
|
|
|
// rework
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -827,12 +849,12 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
|
|
|
|
|
convertedInput = this.convertInputMessageIfNecessary((Message) input, type);
|
|
|
|
|
if (convertedInput == null) { // give ConversionService a chance
|
|
|
|
|
convertedInput = this.convertNonMessageInputIfNecessary(type, ((Message) input).getPayload(), false);
|
|
|
|
|
convertedInput = this.convertNonMessageInputIfNecessary(type, ((Message) input).getPayload(),
|
|
|
|
|
false);
|
|
|
|
|
}
|
|
|
|
|
if (convertedInput != null && !FunctionTypeUtils.isMultipleArgumentType(this.inputType)) {
|
|
|
|
|
convertedInput = !convertedInput.equals(input)
|
|
|
|
|
? new OriginalMessageHolder(convertedInput, (Message<?>) input)
|
|
|
|
|
: convertedInput;
|
|
|
|
|
? new OriginalMessageHolder(convertedInput, (Message<?>) input) : convertedInput;
|
|
|
|
|
}
|
|
|
|
|
if (convertedInput != null && logger.isDebugEnabled()) {
|
|
|
|
|
logger.debug("Converted Message: " + input + " to: " + convertedInput);
|
|
|
|
|
@@ -853,16 +875,17 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* This is an optional conversion which would only happen if `expected-content-type` is
|
|
|
|
|
* set as a header in a message or explicitly provided as part of the lookup.
|
|
|
|
|
* This is an optional conversion which would only happen if
|
|
|
|
|
* `expected-content-type` is set as a header in a message or explicitly provided
|
|
|
|
|
* as part of the lookup.
|
|
|
|
|
*/
|
|
|
|
|
private Object convertOutputIfNecessary(Object output, Type type, String[] contentType) {
|
|
|
|
|
if (this.skipOutputConversion) {
|
|
|
|
|
return output;
|
|
|
|
|
}
|
|
|
|
|
if (output instanceof Message && !this.containsRetainMessageSignalInHeaders((Message) output)) {
|
|
|
|
|
if (!FunctionTypeUtils.isMessage(type) ||
|
|
|
|
|
(FunctionTypeUtils.isMessage(type) && Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(type)))) {
|
|
|
|
|
if (!FunctionTypeUtils.isMessage(type) || (FunctionTypeUtils.isMessage(type)
|
|
|
|
|
&& Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(type)))) {
|
|
|
|
|
output = ((Message) output).getPayload();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -882,13 +905,16 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
convertedOutput = this.convertOutputPublisherIfNecessary((Publisher) output, type, contentType);
|
|
|
|
|
}
|
|
|
|
|
else if (output instanceof Message) {
|
|
|
|
|
convertedOutput = this.convertOutputMessageIfNecessary(output, ObjectUtils.isEmpty(contentType) ? null : contentType[0]);
|
|
|
|
|
convertedOutput = this.convertOutputMessageIfNecessary(output,
|
|
|
|
|
ObjectUtils.isEmpty(contentType) ? null : contentType[0]);
|
|
|
|
|
}
|
|
|
|
|
else if (output instanceof Collection && this.isOutputTypeMessage()) {
|
|
|
|
|
convertedOutput = this.convertMultipleOutputValuesIfNecessary(output, ObjectUtils.isEmpty(contentType) ? null : contentType);
|
|
|
|
|
convertedOutput = this.convertMultipleOutputValuesIfNecessary(output,
|
|
|
|
|
ObjectUtils.isEmpty(contentType) ? null : contentType);
|
|
|
|
|
}
|
|
|
|
|
else if (ObjectUtils.isArray(output) && !(output instanceof byte[])) {
|
|
|
|
|
convertedOutput = this.convertMultipleOutputValuesIfNecessary(output, ObjectUtils.isEmpty(contentType) ? null : contentType);
|
|
|
|
|
convertedOutput = this.convertMultipleOutputValuesIfNecessary(output,
|
|
|
|
|
ObjectUtils.isEmpty(contentType) ? null : contentType);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
convertedOutput = messageConverter.toMessage(output,
|
|
|
|
|
@@ -899,15 +925,16 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Will check if message contains any of the headers that are considered to serve as
|
|
|
|
|
* signals to retain output as Message (regardless of the output type of function).
|
|
|
|
|
* At this moment presence of 'scf-func-name' header or any header that begins with `lambda'
|
|
|
|
|
* (use by AWS) will result in this method returning true.
|
|
|
|
|
* Will check if message contains any of the headers that are considered to serve
|
|
|
|
|
* as signals to retain output as Message (regardless of the output type of
|
|
|
|
|
* function). At this moment presence of 'scf-func-name' header or any header that
|
|
|
|
|
* begins with `lambda' (use by AWS) will result in this method returning true.
|
|
|
|
|
*/
|
|
|
|
|
/*
|
|
|
|
|
* TODO we need to investigate if this could be extracted into some type of strategy since at
|
|
|
|
|
* the pure core level there is no case for this to ever be true. In fact today it is only AWS Lambda
|
|
|
|
|
* case that requires it since it may contain forwarding url
|
|
|
|
|
* TODO we need to investigate if this could be extracted into some type of
|
|
|
|
|
* strategy since at the pure core level there is no case for this to ever be
|
|
|
|
|
* true. In fact today it is only AWS Lambda case that requires it since it may
|
|
|
|
|
* contain forwarding url
|
|
|
|
|
*/
|
|
|
|
|
private boolean containsRetainMessageSignalInHeaders(Message message) {
|
|
|
|
|
if (new CloudEventAttributes(message.getHeaders()).isValidCloudEvent()) {
|
|
|
|
|
@@ -915,8 +942,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
for (String headerName : message.getHeaders().keySet()) {
|
|
|
|
|
if (headerName.startsWith("lambda") ||
|
|
|
|
|
headerName.startsWith("scf-func-name")) {
|
|
|
|
|
if (headerName.startsWith("lambda") || headerName.startsWith("scf-func-name")) {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -941,8 +967,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
convertedInput = SimpleFunctionRegistry.this.jsonMapper.fromJson(input, inputType);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else if (SimpleFunctionRegistry.this.conversionService != null
|
|
|
|
|
&& !rawInputType.equals(input.getClass())
|
|
|
|
|
else if (SimpleFunctionRegistry.this.conversionService != null && !rawInputType.equals(input.getClass())
|
|
|
|
|
&& SimpleFunctionRegistry.this.conversionService.canConvert(input.getClass(), rawInputType)) {
|
|
|
|
|
convertedInput = SimpleFunctionRegistry.this.conversionService.convert(input, rawInputType);
|
|
|
|
|
}
|
|
|
|
|
@@ -956,10 +981,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
*
|
|
|
|
|
*/
|
|
|
|
|
private boolean isWrapConvertedInputInMessage(Object convertedInput) {
|
|
|
|
|
return this.inputType != null
|
|
|
|
|
&& FunctionTypeUtils.isMessage(this.inputType)
|
|
|
|
|
&& !(convertedInput instanceof Message)
|
|
|
|
|
&& !(convertedInput instanceof Publisher)
|
|
|
|
|
return this.inputType != null && FunctionTypeUtils.isMessage(this.inputType)
|
|
|
|
|
&& !(convertedInput instanceof Message) && !(convertedInput instanceof Publisher)
|
|
|
|
|
&& !(convertedInput instanceof OriginalMessageHolder);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -967,7 +990,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
*
|
|
|
|
|
*/
|
|
|
|
|
private Type extractActualValueTypeIfNecessary(Type type) {
|
|
|
|
|
if (type instanceof ParameterizedType && (FunctionTypeUtils.isPublisher(type) || FunctionTypeUtils.isMessage(type))) {
|
|
|
|
|
if (type instanceof ParameterizedType
|
|
|
|
|
&& (FunctionTypeUtils.isPublisher(type) || FunctionTypeUtils.isMessage(type))) {
|
|
|
|
|
return FunctionTypeUtils.getGenericType(type);
|
|
|
|
|
}
|
|
|
|
|
return type;
|
|
|
|
|
@@ -1008,10 +1032,12 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
if (this.isInputTypeMessage()) {
|
|
|
|
|
if (convertedInput == null) {
|
|
|
|
|
/*
|
|
|
|
|
* In the event conversion was unsuccessful we simply return the original un-converted message.
|
|
|
|
|
* This will help to deal with issues like KafkaNull and others. However if this was not the intention
|
|
|
|
|
* of the developer, this would be discovered early in the development process where the
|
|
|
|
|
* additional message converter could be added to facilitate the conversion.
|
|
|
|
|
* In the event conversion was unsuccessful we simply return the
|
|
|
|
|
* original un-converted message. This will help to deal with issues
|
|
|
|
|
* like KafkaNull and others. However if this was not the intention of
|
|
|
|
|
* the developer, this would be discovered early in the development
|
|
|
|
|
* process where the additional message converter could be added to
|
|
|
|
|
* facilitate the conversion.
|
|
|
|
|
*/
|
|
|
|
|
logger.info("Input type conversion of payload " + message.getPayload() + " resulted in 'null'. "
|
|
|
|
|
+ "Will use the original message as input.");
|
|
|
|
|
@@ -1019,7 +1045,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
if (!(convertedInput instanceof Message)) {
|
|
|
|
|
convertedInput = MessageBuilder.withPayload(convertedInput).copyHeaders(message.getHeaders()).build();
|
|
|
|
|
convertedInput = MessageBuilder.withPayload(convertedInput).copyHeaders(message.getHeaders())
|
|
|
|
|
.build();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -1034,10 +1061,10 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
Object[] multipleValueArguments = this.parseMultipleValueArguments(output, outputTypes.length);
|
|
|
|
|
Object[] convertedOutputs = new Object[outputTypes.length];
|
|
|
|
|
for (int i = 0; i < multipleValueArguments.length; i++) {
|
|
|
|
|
String[] ctToUse = !ObjectUtils.isEmpty(contentType)
|
|
|
|
|
? new String[]{contentType[i]}
|
|
|
|
|
: new String[] {"application/json"};
|
|
|
|
|
Object convertedInput = this.convertOutputIfNecessary(multipleValueArguments[i], outputTypes[i], ctToUse);
|
|
|
|
|
String[] ctToUse = !ObjectUtils.isEmpty(contentType) ? new String[] { contentType[i] }
|
|
|
|
|
: new String[] { "application/json" };
|
|
|
|
|
Object convertedInput = this.convertOutputIfNecessary(multipleValueArguments[i], outputTypes[i],
|
|
|
|
|
ctToUse);
|
|
|
|
|
convertedOutputs[i] = convertedInput;
|
|
|
|
|
}
|
|
|
|
|
return Tuples.fromArray(convertedOutputs);
|
|
|
|
|
@@ -1050,15 +1077,18 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
private Object convertOutputMessageIfNecessary(Object output, String expectedOutputContetntType) {
|
|
|
|
|
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
|
|
|
|
|
.getField(SimpleFunctionRegistry.this.headersField, ((Message) output).getHeaders());
|
|
|
|
|
String contentType = ((Message) output).getHeaders().containsKey(FunctionProperties.EXPECT_CONTENT_TYPE_HEADER)
|
|
|
|
|
? (String) ((Message) output).getHeaders().get(FunctionProperties.EXPECT_CONTENT_TYPE_HEADER)
|
|
|
|
|
String contentType = ((Message) output).getHeaders()
|
|
|
|
|
.containsKey(FunctionProperties.EXPECT_CONTENT_TYPE_HEADER)
|
|
|
|
|
? (String) ((Message) output).getHeaders()
|
|
|
|
|
.get(FunctionProperties.EXPECT_CONTENT_TYPE_HEADER)
|
|
|
|
|
: expectedOutputContetntType;
|
|
|
|
|
|
|
|
|
|
if (StringUtils.hasText(contentType)) {
|
|
|
|
|
String[] expectedContentTypes = StringUtils.delimitedListToStringArray(contentType, ",");
|
|
|
|
|
for (String expectedContentType : expectedContentTypes) {
|
|
|
|
|
headersMap.put(MessageHeaders.CONTENT_TYPE, expectedContentType);
|
|
|
|
|
Object result = messageConverter.toMessage(((Message) output).getPayload(), ((Message) output).getHeaders());
|
|
|
|
|
Object result = messageConverter.toMessage(((Message) output).getPayload(),
|
|
|
|
|
((Message) output).getHeaders());
|
|
|
|
|
if (result != null) {
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
@@ -1072,9 +1102,12 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
*/
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
|
private Object convertMultipleOutputValuesIfNecessary(Object output, String[] contentType) {
|
|
|
|
|
Collection outputCollection = ObjectUtils.isArray(output) ? CollectionUtils.arrayToList(output) : (Collection) output;
|
|
|
|
|
Collection convertedOutputCollection = outputCollection instanceof List ? new ArrayList<>() : new TreeSet<>();
|
|
|
|
|
Type type = this.isOutputTypeMessage() ? FunctionTypeUtils.getGenericType(this.outputType) : this.outputType;
|
|
|
|
|
Collection outputCollection = ObjectUtils.isArray(output) ? CollectionUtils.arrayToList(output)
|
|
|
|
|
: (Collection) output;
|
|
|
|
|
Collection convertedOutputCollection = outputCollection instanceof List ? new ArrayList<>()
|
|
|
|
|
: new TreeSet<>();
|
|
|
|
|
Type type = this.isOutputTypeMessage() ? FunctionTypeUtils.getGenericType(this.outputType)
|
|
|
|
|
: this.outputType;
|
|
|
|
|
for (Object outToConvert : outputCollection) {
|
|
|
|
|
Object result = this.convertOutputIfNecessary(outToConvert, type, contentType);
|
|
|
|
|
Assert.notNull(result, () -> "Failed to convert output '" + outToConvert + "'");
|
|
|
|
|
@@ -1106,19 +1139,22 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
*
|
|
|
|
|
*/
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
|
private Object convertOutputPublisherIfNecessary(Publisher publisher, Type type, String[] expectedOutputContentType) {
|
|
|
|
|
private Object convertOutputPublisherIfNecessary(Publisher publisher, Type type,
|
|
|
|
|
String[] expectedOutputContentType) {
|
|
|
|
|
return publisher instanceof Mono
|
|
|
|
|
? Mono.from(publisher).map(v -> this.convertOutputIfNecessary(v, type, expectedOutputContentType))
|
|
|
|
|
.doOnError(ex -> logger.error("Failed to convert output", (Throwable) ex))
|
|
|
|
|
: Flux.from(publisher).map(v -> this.convertOutputIfNecessary(v, type, expectedOutputContentType))
|
|
|
|
|
.doOnError(ex -> logger.error("Failed to convert output", (Throwable) ex));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
*
|
|
|
|
|
*/
|
|
|
|
|
private static final class OriginalMessageHolder {
|
|
|
|
|
private static final class OriginalMessageHolder {
|
|
|
|
|
|
|
|
|
|
private final Object value;
|
|
|
|
|
|
|
|
|
|
private final Message<?> originalMessage;
|
|
|
|
|
@@ -1135,5 +1171,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
|
|
|
|
public Message<?> getOriginalMessage() {
|
|
|
|
|
return this.originalMessage;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|