From b1a6fc4994abbc7090f0ea671590e4437c73fdae Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Tue, 7 Jun 2022 21:22:25 +0200 Subject: [PATCH] KafkaNull batch attempt --- .../SmartCompositeMessageConverter.java | 42 ++++++++++++++++--- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java index 32524a8f7..3691d35df 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java @@ -16,12 +16,15 @@ package org.springframework.cloud.function.context.config; +import java.lang.reflect.Type; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; @@ -29,7 +32,9 @@ import org.springframework.messaging.converter.AbstractMessageConverter; import org.springframework.messaging.converter.CompositeMessageConverter; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.converter.SmartMessageConverter; +import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageHeaderAccessor; +import org.springframework.util.CollectionUtils; import org.springframework.util.MimeType; import org.springframework.util.StringUtils; @@ -75,11 +80,38 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter { if (!(message.getPayload() instanceof byte[]) && targetClass.isInstance(message.getPayload()) && !(message.getPayload() instanceof Collection)) { return message.getPayload(); } - Object result = (converter instanceof SmartMessageConverter ? - ((SmartMessageConverter) converter).fromMessage(message, targetClass, conversionHint) : - converter.fromMessage(message, targetClass)); - if (result != null) { - return result; + + if (message.getPayload() instanceof Iterable && conversionHint != null) { + Iterable iterablePayload = (Iterable) message.getPayload(); + Type t = FunctionTypeUtils.getImmediateGenericType((Type) conversionHint, 0); + Class rawType = FunctionTypeUtils.getRawType(t); + List resultList = new ArrayList<>(); + for (Object item : iterablePayload) { + /* + * Somewhere here we can do KafkaNull check or see below + */ + Message m = MessageBuilder.withPayload(item).copyHeaders(message.getHeaders()).build(); + Object result = (converter instanceof SmartMessageConverter & rawType != t ? + ((SmartMessageConverter) converter).fromMessage(m, rawType, t) : + converter.fromMessage(m, rawType)); + if (result != null) { + /* + * Or most likely here we can do the KafkaNull check and not add it to the list + */ + resultList.add(result); + } + } + if (!CollectionUtils.isEmpty(resultList)) { + return resultList; + } + } + else { + Object result = (converter instanceof SmartMessageConverter ? + ((SmartMessageConverter) converter).fromMessage(message, targetClass, conversionHint) : + converter.fromMessage(message, targetClass)); + if (result != null) { + return result; + } } } return null;