Fix payload extraction during output conversion

Also, restructured CloudEventsFunctionInvocationHelper's postProcessig logic
This commit is contained in:
Oleg Zhurakousky
2020-12-12 13:48:47 +01:00
parent 7675a9da27
commit 5e468fba73
3 changed files with 33 additions and 12 deletions

View File

@@ -52,8 +52,8 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Me
@Override
public boolean isRetainOuputAsMessage(Message<?> message) {
return message.getHeaders().containsKey(MessageUtils.MESSAGE_TYPE)
&& message.getHeaders().get(MessageUtils.MESSAGE_TYPE).equals(CloudEventMessageUtils.CLOUDEVENT_VALUE);
return message.getHeaders().containsKey(MessageUtils.TARGET_PROTOCOL) || (message.getHeaders().containsKey(MessageUtils.MESSAGE_TYPE)
&& message.getHeaders().get(MessageUtils.MESSAGE_TYPE).equals(CloudEventMessageUtils.CLOUDEVENT_VALUE));
}
@Override

View File

@@ -72,10 +72,12 @@ public final class FunctionTypeUtils {
* @return 'true' if this type represents a {@link Collection}. Otherwise 'false'.
*/
public static boolean isTypeCollection(Type type) {
if (Collection.class.isAssignableFrom(getRawType(type))) {
return true;
}
type = getGenericType(type);
Type rawType = type instanceof ParameterizedType ? ((ParameterizedType) type).getRawType() : type;
return rawType instanceof Class<?> && Collection.class.isAssignableFrom((Class<?>) rawType);
Class<?> rawType = type instanceof ParameterizedType ? getRawType(type) : (Class<?>) type;
return Collection.class.isAssignableFrom(rawType);
}
/**
@@ -298,8 +300,8 @@ public final class FunctionTypeUtils {
}
public static boolean isCollectionOfMessage(Type type) {
if (isMessage(type)) {
return isTypeCollection(type);
if (isMessage(type) && isTypeCollection(type)) {
return isMessage(getImmediateGenericType(type, 0));
}
return false;
}

View File

@@ -880,6 +880,28 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
return convertedInput;
}
private boolean isExtractPayload(Message<?> message, Type type) {
if (FunctionTypeUtils.isCollectionOfMessage(type)) {
return true;
}
if (FunctionTypeUtils.isMessage(type)) {
return false;
}
Object payload = message.getPayload();
if (ObjectUtils.isArray(payload)) {
payload = CollectionUtils.arrayToList(payload);
}
if (payload instanceof Collection
&& Message.class.isAssignableFrom(CollectionUtils.findCommonElementType((Collection<?>) payload))) {
return true;
}
if (this.containsRetainMessageSignalInHeaders(message)) {
return false;
}
return true;
}
/**
* This is an optional conversion which would only happen if `expected-content-type` is
* set as a header in a message or explicitly provided as part of the lookup.
@@ -888,11 +910,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
if (this.skipOutputConversion) {
return output;
}
if (output instanceof Message && !this.containsRetainMessageSignalInHeaders((Message) output)) {
if (!FunctionTypeUtils.isMessage(type) ||
(FunctionTypeUtils.isMessage(type) && Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(type)))) {
output = ((Message) output).getPayload();
}
if (output instanceof Message && isExtractPayload((Message<?>) output, type)) {
output = ((Message) output).getPayload();
}
if (!(output instanceof Publisher) && this.enhancer != null) {
output = enhancer.apply(output);