From b486e5a10441c405cc04de980ccdd903f068b614 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 27 Jul 2020 14:26:32 +0200 Subject: [PATCH] Add fromMessage wrapper to handle collections --- .../catalog/SimpleFunctionRegistry.java | 54 ++++++++++++++++++- 1 file changed, 52 insertions(+), 2 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index 1447255ce..f9b4a31cb 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -37,6 +37,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; @@ -775,8 +776,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect if (value instanceof Message) { // see AWS adapter with Optional payload if (messageNeedsConversion(rawType, (Message) value)) { convertedValue = FunctionTypeUtils.isTypeCollection(type) - ? messageConverter.fromMessage((Message) value, (Class) rawType, FunctionTypeUtils.getGenericType(type)) - : messageConverter.fromMessage((Message) value, (Class) rawType); + ? this.fromMessage((Message) value, (Class) rawType, FunctionTypeUtils.getGenericType(type)) + : this.fromMessage((Message) value, (Class) rawType, null); if (logger.isDebugEnabled()) { logger.debug("Converted from Message: " + convertedValue); } @@ -820,6 +821,55 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect return convertedValue; } + private Object fromMessage(Message message, Class rawType, Object conversionHint) { + Stream stream = null; + if (message.getPayload() instanceof Collection) { + stream = ((Collection) message.getPayload()).stream(); + } + else if (message.getPayload().getClass().isArray() && !(message.getPayload() instanceof byte[])) { + stream = Stream.of(message.getPayload()); + } + if (stream != null && !this.isJsonContentType(message)) { + Collection convertedCollection = stream.map(v -> { + Message m = new Message() { + @Override + public Object getPayload() { + return v; + } + + @Override + public MessageHeaders getHeaders() { + return message.getHeaders(); + } + }; + if (conversionHint != null && conversionHint instanceof ParameterizedType) { + Type tClass = FunctionTypeUtils.getImmediateGenericType((ParameterizedType) conversionHint, 0); + if (byte[].class.isAssignableFrom((Class) tClass)) { + return message; + } + return messageConverter.fromMessage(m, (Class) tClass); + } + + return messageConverter.fromMessage(m, rawType, conversionHint); + + }) + .filter(v -> v != null).collect(Collectors.toList()); + return CollectionUtils.isEmpty(convertedCollection) ? null : convertedCollection; + } + else { + return messageConverter.fromMessage(message, rawType, conversionHint); + } + } + + private boolean isJsonContentType(Message message) { + Object ct = message.getHeaders().get(MessageHeaders.CONTENT_TYPE); + if (ct != null) { + ct = ct.toString(); + return ((String) ct).startsWith("application/json"); + } + return false; + } + private boolean isJson(Object value) { String v = value instanceof byte[] ? new String((byte[]) value, StandardCharsets.UTF_8)