diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index bc11e4a4b..5b2e91419 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -688,6 +688,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect inputValue = this.extractValueFromOriginalValueHolderIfNecessary(value); } + if (inputValue instanceof Message && !this.isInputTypeMessage()) { + inputValue = ((Message) inputValue).getPayload(); + } Object result = ((Function) this.target).apply(inputValue); return value instanceof OriginalMessageHolder @@ -985,7 +988,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect convertedInput = message; } else { - convertedInput = MessageBuilder.withPayload(convertedInput).copyHeaders(message.getHeaders()).build(); + if (!(convertedInput instanceof Message)) { + convertedInput = MessageBuilder.withPayload(convertedInput).copyHeaders(message.getHeaders()).build(); + } } } return convertedInput; diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventJsonMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventJsonMessageConverter.java new file mode 100644 index 000000000..dbd393b6d --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventJsonMessageConverter.java @@ -0,0 +1,64 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.config; + +import java.lang.reflect.Type; +import java.util.Collection; +import java.util.Map; + +import org.springframework.cloud.function.json.JsonMapper; +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.MimeType; + +/** + * Implementation of {@link MessageConverter} which uses Jackson or Gson libraries to do the + * actual conversion via {@link JsonMapper} instance. + * + * @author Oleg Zhurakousky + * + * @since 3.1.0 + */ +public class CloudEventJsonMessageConverter extends JsonMessageConverter { + + private final JsonMapper mapper; + + public CloudEventJsonMessageConverter(JsonMapper jsonMapper) { + super(jsonMapper, new MimeType("application", "cloudevents+json")); + this.mapper = jsonMapper; + } + + @Override + protected Object convertFromInternal(Message message, Class targetClass, @Nullable Object conversionHint) { + Type convertToType = conversionHint == null ? targetClass : (Type) conversionHint; + String jsonString = (String) message.getPayload(); + Map mapEvent = this.mapper.fromJson(jsonString, Map.class); + Object payload = this.mapper.fromJson(this.mapper.toJson(mapEvent.get("data")), convertToType); + mapEvent.remove("data"); + return MessageBuilder.withPayload(payload).copyHeaders(mapEvent).build(); + } + + @Override + protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers, + @Nullable Object conversionHint) { + throw new UnsupportedOperationException("Temporarily not supported as this converter is work in progress"); + } + +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index 8aaa824f6..66ac8e091 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -97,11 +97,11 @@ public class ContextFunctionCatalogAutoConfiguration { .collect(Collectors.toList()); mcList.add(new JsonMessageConverter(jsonMapper)); + mcList.add(new CloudEventJsonMessageConverter(jsonMapper)); mcList.add(new ByteArrayMessageConverter()); mcList.add(new StringMessageConverter()); mcList.add(new PrimitiveTypesFromStringMessageConverter(conversionService)); - if (!CollectionUtils.isEmpty(mcList)) { messageConverter = new SmartCompositeMessageConverter(mcList); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java index 05fe75cc1..cb7a16798 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java @@ -78,14 +78,20 @@ public class JsonMessageConverter extends AbstractMessageConverter { return message.getPayload(); } Type convertToType = conversionHint == null ? targetClass : (Type) conversionHint; - try { - return this.jsonMapper.fromJson(message.getPayload(), convertToType); + if (targetClass == byte[].class && message.getPayload() instanceof String) { + return ((String) message.getPayload()).getBytes(StandardCharsets.UTF_8); } - catch (Exception e) { - if (message.getPayload() instanceof byte[] && targetClass.isAssignableFrom(String.class)) { - return new String((byte[]) message.getPayload(), StandardCharsets.UTF_8); + else { + try { + return this.jsonMapper.fromJson(message.getPayload(), convertToType); + } + catch (Exception e) { + if (message.getPayload() instanceof byte[] && targetClass.isAssignableFrom(String.class)) { + return new String((byte[]) message.getPayload(), StandardCharsets.UTF_8); + } } } + return null; }