Add fromMessage wrapper to handle collections
This commit is contained in:
@@ -37,6 +37,7 @@ import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.aopalliance.intercept.MethodInterceptor;
|
||||
import org.aopalliance.intercept.MethodInvocation;
|
||||
@@ -775,8 +776,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
if (value instanceof Message<?>) { // see AWS adapter with Optional payload
|
||||
if (messageNeedsConversion(rawType, (Message<?>) value)) {
|
||||
convertedValue = FunctionTypeUtils.isTypeCollection(type)
|
||||
? messageConverter.fromMessage((Message<?>) value, (Class<?>) rawType, FunctionTypeUtils.getGenericType(type))
|
||||
: messageConverter.fromMessage((Message<?>) value, (Class<?>) rawType);
|
||||
? this.fromMessage((Message<?>) value, (Class<?>) rawType, FunctionTypeUtils.getGenericType(type))
|
||||
: this.fromMessage((Message<?>) value, (Class<?>) rawType, null);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Converted from Message: " + convertedValue);
|
||||
}
|
||||
@@ -820,6 +821,55 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
return convertedValue;
|
||||
}
|
||||
|
||||
private Object fromMessage(Message<?> message, Class<?> rawType, Object conversionHint) {
|
||||
Stream<?> stream = null;
|
||||
if (message.getPayload() instanceof Collection) {
|
||||
stream = ((Collection<?>) message.getPayload()).stream();
|
||||
}
|
||||
else if (message.getPayload().getClass().isArray() && !(message.getPayload() instanceof byte[])) {
|
||||
stream = Stream.of(message.getPayload());
|
||||
}
|
||||
if (stream != null && !this.isJsonContentType(message)) {
|
||||
Collection<?> convertedCollection = stream.map(v -> {
|
||||
Message<?> m = new Message<Object>() {
|
||||
@Override
|
||||
public Object getPayload() {
|
||||
return v;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageHeaders getHeaders() {
|
||||
return message.getHeaders();
|
||||
}
|
||||
};
|
||||
if (conversionHint != null && conversionHint instanceof ParameterizedType) {
|
||||
Type tClass = FunctionTypeUtils.getImmediateGenericType((ParameterizedType) conversionHint, 0);
|
||||
if (byte[].class.isAssignableFrom((Class<?>) tClass)) {
|
||||
return message;
|
||||
}
|
||||
return messageConverter.fromMessage(m, (Class<?>) tClass);
|
||||
}
|
||||
|
||||
return messageConverter.fromMessage(m, rawType, conversionHint);
|
||||
|
||||
})
|
||||
.filter(v -> v != null).collect(Collectors.toList());
|
||||
return CollectionUtils.isEmpty(convertedCollection) ? null : convertedCollection;
|
||||
}
|
||||
else {
|
||||
return messageConverter.fromMessage(message, rawType, conversionHint);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isJsonContentType(Message<?> message) {
|
||||
Object ct = message.getHeaders().get(MessageHeaders.CONTENT_TYPE);
|
||||
if (ct != null) {
|
||||
ct = ct.toString();
|
||||
return ((String) ct).startsWith("application/json");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean isJson(Object value) {
|
||||
String v = value instanceof byte[]
|
||||
? new String((byte[]) value, StandardCharsets.UTF_8)
|
||||
|
||||
Reference in New Issue
Block a user