KafkaNull batch attempt

This commit is contained in:
Oleg Zhurakousky
2022-06-07 21:22:25 +02:00
parent 724ba06c11
commit b1a6fc4994

View File

@@ -16,12 +16,15 @@
package org.springframework.cloud.function.context.config;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
@@ -29,7 +32,9 @@ import org.springframework.messaging.converter.AbstractMessageConverter;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.util.CollectionUtils;
import org.springframework.util.MimeType;
import org.springframework.util.StringUtils;
@@ -75,11 +80,38 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter {
if (!(message.getPayload() instanceof byte[]) && targetClass.isInstance(message.getPayload()) && !(message.getPayload() instanceof Collection<?>)) {
return message.getPayload();
}
Object result = (converter instanceof SmartMessageConverter ?
((SmartMessageConverter) converter).fromMessage(message, targetClass, conversionHint) :
converter.fromMessage(message, targetClass));
if (result != null) {
return result;
if (message.getPayload() instanceof Iterable && conversionHint != null) {
Iterable<Object> iterablePayload = (Iterable) message.getPayload();
Type t = FunctionTypeUtils.getImmediateGenericType((Type) conversionHint, 0);
Class rawType = FunctionTypeUtils.getRawType(t);
List<Object> resultList = new ArrayList<>();
for (Object item : iterablePayload) {
/*
* Somewhere here we can do KafkaNull check or see below
*/
Message m = MessageBuilder.withPayload(item).copyHeaders(message.getHeaders()).build();
Object result = (converter instanceof SmartMessageConverter & rawType != t ?
((SmartMessageConverter) converter).fromMessage(m, rawType, t) :
converter.fromMessage(m, rawType));
if (result != null) {
/*
* Or most likely here we can do the KafkaNull check and not add it to the list
*/
resultList.add(result);
}
}
if (!CollectionUtils.isEmpty(resultList)) {
return resultList;
}
}
else {
Object result = (converter instanceof SmartMessageConverter ?
((SmartMessageConverter) converter).fromMessage(message, targetClass, conversionHint) :
converter.fromMessage(message, targetClass));
if (result != null) {
return result;
}
}
}
return null;