Add MessageConverter filtering logic
Aligned ApplicationJsonMessageMarshallingConverter with the same from s-c-Stream
This commit is contained in:
@@ -17,6 +17,7 @@
|
||||
package org.springframework.cloud.function.context.config;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.ParameterizedType;
|
||||
import java.lang.reflect.Type;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
@@ -36,6 +37,8 @@ import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
|
||||
import org.springframework.messaging.converter.MessageConversionException;
|
||||
import org.springframework.util.MimeType;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* Variation of {@link MappingJackson2MessageConverter} to support marshalling and
|
||||
@@ -48,12 +51,16 @@ import org.springframework.messaging.converter.MessageConversionException;
|
||||
*/
|
||||
class ApplicationJsonMessageMarshallingConverter extends MappingJackson2MessageConverter {
|
||||
|
||||
private final Field headersField;
|
||||
|
||||
private final Map<Type, JavaType> typeCache = new ConcurrentHashMap<>();
|
||||
|
||||
ApplicationJsonMessageMarshallingConverter(@Nullable ObjectMapper objectMapper) {
|
||||
if (objectMapper != null) {
|
||||
this.setObjectMapper(objectMapper);
|
||||
}
|
||||
this.headersField = ReflectionUtils.findField(MessageHeaders.class, "headers");
|
||||
this.headersField.setAccessible(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -100,7 +107,6 @@ class ApplicationJsonMessageMarshallingConverter extends MappingJackson2MessageC
|
||||
else if (conversionHint instanceof ParameterizedType) {
|
||||
result = convertParameterizedType(message, (Type) conversionHint);
|
||||
}
|
||||
|
||||
if (result == null) {
|
||||
if (message.getPayload() instanceof byte[]
|
||||
&& targetClass.isAssignableFrom(String.class)) {
|
||||
@@ -146,6 +152,11 @@ class ApplicationJsonMessageMarshallingConverter extends MappingJackson2MessageC
|
||||
else if (value instanceof String) {
|
||||
return objectMapper.readValue((String) value, typeToUse.getContentType());
|
||||
}
|
||||
else {
|
||||
// fall back to simple type-conversion
|
||||
// see https://github.com/spring-cloud/spring-cloud-stream/issues/1898
|
||||
return objectMapper.convertValue(value, typeToUse.getContentType());
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("Failed to convert payload " + value, e);
|
||||
@@ -163,4 +174,17 @@ class ApplicationJsonMessageMarshallingConverter extends MappingJackson2MessageC
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
@Nullable
|
||||
protected MimeType getMimeType(@Nullable MessageHeaders headers) {
|
||||
Object contentType = headers.get(MessageHeaders.CONTENT_TYPE);
|
||||
if (contentType instanceof byte[]) {
|
||||
contentType = new String((byte[]) contentType, StandardCharsets.UTF_8);
|
||||
contentType = ((String) contentType).replace("\"", "");
|
||||
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils.getField(this.headersField, headers);
|
||||
headersMap.put(MessageHeaders.CONTENT_TYPE, contentType);
|
||||
}
|
||||
return super.getMimeType(headers);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,10 +18,10 @@ package org.springframework.cloud.function.context.config;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.gson.Gson;
|
||||
@@ -70,14 +70,16 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
static final String PREFERRED_MAPPER_PROPERTY = "spring.http.converters.preferred-json-mapper";
|
||||
|
||||
@Bean
|
||||
public FunctionRegistry functionCatalog(Map<String, MessageConverter> messageConverters, @Nullable ObjectMapper objectMapper) {
|
||||
public FunctionRegistry functionCatalog(List<MessageConverter> messageConverters, @Nullable ObjectMapper objectMapper) {
|
||||
ConversionService conversionService = new DefaultConversionService();
|
||||
CompositeMessageConverter messageConverter = null;
|
||||
messageConverters = messageConverters.stream()
|
||||
.filter(c -> isConverterEligible(c)).collect(Collectors.toList());
|
||||
List<MessageConverter> mcList = new ArrayList<>();
|
||||
boolean addDefaultConverters = true;
|
||||
|
||||
if (!CollectionUtils.isEmpty(messageConverters)) {
|
||||
for (MessageConverter mc : messageConverters.values()) {
|
||||
for (MessageConverter mc : messageConverters) {
|
||||
if (mc instanceof CompositeMessageConverter) {
|
||||
mcList.addAll(((CompositeMessageConverter) mc).getConverters());
|
||||
addDefaultConverters = false;
|
||||
@@ -104,6 +106,17 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
return new RoutingFunction(functionCatalog, functionInspector, functionProperties);
|
||||
}
|
||||
|
||||
private boolean isConverterEligible(Object messageConverter) {
|
||||
String messageConverterName = messageConverter.getClass().getName();
|
||||
if (messageConverterName.startsWith("org.springframework.cloud.")) {
|
||||
return true;
|
||||
}
|
||||
else if (!messageConverterName.startsWith("org.springframework.")) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
@ComponentScan(basePackages = "${spring.cloud.function.scan.packages:functions}", //
|
||||
includeFilters = @Filter(type = FilterType.ASSIGNABLE_TYPE,
|
||||
|
||||
Reference in New Issue
Block a user