Add propagation of HTTP headers
Polish function composition logic
This commit is contained in:
@@ -319,8 +319,11 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
registration = new FunctionRegistration<>(function, name).type(currentFunctionType);
|
||||
}
|
||||
|
||||
registrationsByFunction.putIfAbsent(function, registration);
|
||||
registrationsByName.putIfAbsent(name, registration);
|
||||
if (function instanceof RoutingFunction) {
|
||||
registrationsByFunction.putIfAbsent(function, registration);
|
||||
registrationsByName.putIfAbsent(name, registration);
|
||||
}
|
||||
|
||||
function = new FunctionInvocationWrapper(function, currentFunctionType, name, names.length > 1 ? new String[] {} : acceptedOutputTypes);
|
||||
|
||||
if (originFunctionType == null) {
|
||||
@@ -338,6 +341,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
}
|
||||
prefix = "|";
|
||||
}
|
||||
((FunctionInvocationWrapper) resultFunction).acceptedOutputMimeTypes = acceptedOutputTypes;
|
||||
FunctionRegistration<Object> registration = new FunctionRegistration<Object>(resultFunction, definition)
|
||||
.type(originFunctionType);
|
||||
registrationsByFunction.putIfAbsent(resultFunction, registration);
|
||||
@@ -433,7 +437,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
|
||||
private final boolean composed;
|
||||
|
||||
private final String[] acceptedOutputMimeTypes;
|
||||
String[] acceptedOutputMimeTypes;
|
||||
|
||||
private final String functionDefinition;
|
||||
|
||||
@@ -518,6 +522,14 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
private Object invokeFunction(Object input) {
|
||||
Message incomingMessage = null;
|
||||
if (!this.functionDefinition.startsWith(RoutingFunction.FUNCTION_NAME)) {
|
||||
if (input instanceof Message && !FunctionTypeUtils.isMessage(FunctionTypeUtils.getInputType(functionType, 0))) {
|
||||
incomingMessage = (Message) input;
|
||||
input = incomingMessage.getPayload();
|
||||
}
|
||||
}
|
||||
|
||||
Object invocationResult = null;
|
||||
if (this.target instanceof Function) {
|
||||
invocationResult = ((Function) target).apply(input);
|
||||
@@ -547,10 +559,18 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
logger
|
||||
.debug("Result of invocation of \"" + this.functionDefinition + "\" function is '" + invocationResult + "'");
|
||||
}
|
||||
if (!(invocationResult instanceof Message)) {
|
||||
if (incomingMessage != null && invocationResult != null && incomingMessage.getHeaders().containsKey("scf-func-name")) {
|
||||
invocationResult = MessageBuilder.withPayload(invocationResult)
|
||||
.copyHeaders(incomingMessage.getHeaders())
|
||||
.removeHeader(MessageHeaders.CONTENT_TYPE)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
return invocationResult;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
private Object doApply(Object input, boolean consumer, Function<Message, Message> enricher) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Applying function: " + this.functionDefinition);
|
||||
@@ -759,10 +779,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
Expression parsed = new SpelExpressionParser().parseExpression("getT" + (i + 1) + "()");
|
||||
Object inptArgument = parsed.getValue(value);
|
||||
inptArgument = inptArgument instanceof Publisher
|
||||
? this.convertInputPublisherIfNecessary((Publisher<?>) inptArgument, FunctionTypeUtils
|
||||
.getInputType(functionType, i))
|
||||
: this
|
||||
.convertInputValueIfNecessary(inptArgument, FunctionTypeUtils.getInputType(functionType, i));
|
||||
? this.convertInputPublisherIfNecessary((Publisher<?>) inptArgument, FunctionTypeUtils.getInputType(functionType, i))
|
||||
: this.convertInputValueIfNecessary(inptArgument, FunctionTypeUtils.getInputType(functionType, i));
|
||||
convertedInputArray[i] = inptArgument;
|
||||
}
|
||||
convertedValue = Tuples.fromArray(convertedInputArray);
|
||||
@@ -785,9 +803,10 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Converted from Message: " + convertedValue);
|
||||
}
|
||||
if (FunctionTypeUtils.isMessage(type)) {
|
||||
|
||||
if (FunctionTypeUtils.isMessage(type) || ((Message<?>) value).getHeaders().containsKey("scf-func-name")) {
|
||||
convertedValue = MessageBuilder.withPayload(convertedValue)
|
||||
.copyHeaders(((Message<?>) value).getHeaders()).build();
|
||||
.copyHeaders(((Message<?>) value).getHeaders()).build();
|
||||
}
|
||||
}
|
||||
else if (!FunctionTypeUtils.isMessage(type)) {
|
||||
|
||||
Reference in New Issue
Block a user