diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java index 3691d35df..ff69c69dd 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java @@ -19,6 +19,7 @@ package org.springframework.cloud.function.context.config; import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.List; import org.apache.commons.logging.Log; @@ -34,7 +35,6 @@ import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.converter.SmartMessageConverter; import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageHeaderAccessor; -import org.springframework.util.CollectionUtils; import org.springframework.util.MimeType; import org.springframework.util.StringUtils; @@ -73,40 +73,41 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter { return null; } + @SuppressWarnings("unchecked") @Override - @Nullable public Object fromMessage(Message message, Class targetClass, @Nullable Object conversionHint) { - for (MessageConverter converter : getConverters()) { - if (!(message.getPayload() instanceof byte[]) && targetClass.isInstance(message.getPayload()) && !(message.getPayload() instanceof Collection)) { - return message.getPayload(); - } - - if (message.getPayload() instanceof Iterable && conversionHint != null) { - Iterable iterablePayload = (Iterable) message.getPayload(); - Type t = FunctionTypeUtils.getImmediateGenericType((Type) conversionHint, 0); - Class rawType = FunctionTypeUtils.getRawType(t); - List resultList = new ArrayList<>(); - for (Object item : iterablePayload) { - /* - * Somewhere here we can do KafkaNull check or see below - */ - Message m = MessageBuilder.withPayload(item).copyHeaders(message.getHeaders()).build(); - Object result = (converter instanceof SmartMessageConverter & rawType != t ? - ((SmartMessageConverter) converter).fromMessage(m, rawType, t) : - converter.fromMessage(m, rawType)); - if (result != null) { - /* - * Or most likely here we can do the KafkaNull check and not add it to the list - */ - resultList.add(result); + if (!(message.getPayload() instanceof byte[]) && targetClass.isInstance(message.getPayload()) && !(message.getPayload() instanceof Collection)) { + return message.getPayload(); + } + Object result = null; + if (message.getPayload() instanceof Iterable && conversionHint != null) { + Iterable iterablePayload = (Iterable) message.getPayload(); + Type genericItemType = FunctionTypeUtils.getImmediateGenericType((Type) conversionHint, 0); + Class genericItemRawType = FunctionTypeUtils.getRawType(genericItemType); + List resultList = new ArrayList<>(); + for (Object item : iterablePayload) { + boolean isConverted = false; + if (item.getClass().getName().startsWith("org.springframework.kafka.support.KafkaNull")) { + resultList.add(item); + isConverted = true; + } + for (Iterator iterator = getConverters().iterator(); iterator.hasNext() && !isConverted;) { + Message m = MessageBuilder.withPayload(item).copyHeaders(message.getHeaders()).build(); // TODO Message creating may be expensive + MessageConverter converter = (MessageConverter) iterator.next(); + Object conversionResult = (converter instanceof SmartMessageConverter & genericItemRawType != genericItemType ? + ((SmartMessageConverter) converter).fromMessage(m, genericItemRawType, genericItemType) : + converter.fromMessage(m, genericItemRawType)); + if (conversionResult != null) { + resultList.add(conversionResult); + isConverted = true; } } - if (!CollectionUtils.isEmpty(resultList)) { - return resultList; - } } - else { - Object result = (converter instanceof SmartMessageConverter ? + result = resultList; + } + else { + for (MessageConverter converter : getConverters()) { + result = (converter instanceof SmartMessageConverter ? ((SmartMessageConverter) converter).fromMessage(message, targetClass, conversionHint) : converter.fromMessage(message, targetClass)); if (result != null) { @@ -114,7 +115,8 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter { } } } - return null; + + return result; } @Override