Initial commit of KafkaNull changes to SmartCompositeMessageConverter
This commit is contained in:
@@ -19,6 +19,7 @@ package org.springframework.cloud.function.context.config;
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@@ -34,7 +35,6 @@ import org.springframework.messaging.converter.MessageConverter;
|
||||
import org.springframework.messaging.converter.SmartMessageConverter;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.messaging.support.MessageHeaderAccessor;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.MimeType;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
@@ -73,40 +73,41 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter {
|
||||
return null;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
@Nullable
|
||||
public Object fromMessage(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
|
||||
for (MessageConverter converter : getConverters()) {
|
||||
if (!(message.getPayload() instanceof byte[]) && targetClass.isInstance(message.getPayload()) && !(message.getPayload() instanceof Collection<?>)) {
|
||||
return message.getPayload();
|
||||
}
|
||||
|
||||
if (message.getPayload() instanceof Iterable && conversionHint != null) {
|
||||
Iterable<Object> iterablePayload = (Iterable) message.getPayload();
|
||||
Type t = FunctionTypeUtils.getImmediateGenericType((Type) conversionHint, 0);
|
||||
Class rawType = FunctionTypeUtils.getRawType(t);
|
||||
List<Object> resultList = new ArrayList<>();
|
||||
for (Object item : iterablePayload) {
|
||||
/*
|
||||
* Somewhere here we can do KafkaNull check or see below
|
||||
*/
|
||||
Message m = MessageBuilder.withPayload(item).copyHeaders(message.getHeaders()).build();
|
||||
Object result = (converter instanceof SmartMessageConverter & rawType != t ?
|
||||
((SmartMessageConverter) converter).fromMessage(m, rawType, t) :
|
||||
converter.fromMessage(m, rawType));
|
||||
if (result != null) {
|
||||
/*
|
||||
* Or most likely here we can do the KafkaNull check and not add it to the list
|
||||
*/
|
||||
resultList.add(result);
|
||||
if (!(message.getPayload() instanceof byte[]) && targetClass.isInstance(message.getPayload()) && !(message.getPayload() instanceof Collection<?>)) {
|
||||
return message.getPayload();
|
||||
}
|
||||
Object result = null;
|
||||
if (message.getPayload() instanceof Iterable && conversionHint != null) {
|
||||
Iterable<Object> iterablePayload = (Iterable<Object>) message.getPayload();
|
||||
Type genericItemType = FunctionTypeUtils.getImmediateGenericType((Type) conversionHint, 0);
|
||||
Class<?> genericItemRawType = FunctionTypeUtils.getRawType(genericItemType);
|
||||
List<Object> resultList = new ArrayList<>();
|
||||
for (Object item : iterablePayload) {
|
||||
boolean isConverted = false;
|
||||
if (item.getClass().getName().startsWith("org.springframework.kafka.support.KafkaNull")) {
|
||||
resultList.add(item);
|
||||
isConverted = true;
|
||||
}
|
||||
for (Iterator<MessageConverter> iterator = getConverters().iterator(); iterator.hasNext() && !isConverted;) {
|
||||
Message<?> m = MessageBuilder.withPayload(item).copyHeaders(message.getHeaders()).build(); // TODO Message creating may be expensive
|
||||
MessageConverter converter = (MessageConverter) iterator.next();
|
||||
Object conversionResult = (converter instanceof SmartMessageConverter & genericItemRawType != genericItemType ?
|
||||
((SmartMessageConverter) converter).fromMessage(m, genericItemRawType, genericItemType) :
|
||||
converter.fromMessage(m, genericItemRawType));
|
||||
if (conversionResult != null) {
|
||||
resultList.add(conversionResult);
|
||||
isConverted = true;
|
||||
}
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(resultList)) {
|
||||
return resultList;
|
||||
}
|
||||
}
|
||||
else {
|
||||
Object result = (converter instanceof SmartMessageConverter ?
|
||||
result = resultList;
|
||||
}
|
||||
else {
|
||||
for (MessageConverter converter : getConverters()) {
|
||||
result = (converter instanceof SmartMessageConverter ?
|
||||
((SmartMessageConverter) converter).fromMessage(message, targetClass, conversionHint) :
|
||||
converter.fromMessage(message, targetClass));
|
||||
if (result != null) {
|
||||
@@ -114,7 +115,8 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter {
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user