From 8a032e7ed9455981dbf5c88f76c627893273083b Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Tue, 10 Nov 2020 14:50:09 +0100 Subject: [PATCH] GH-422 Initial support for CloudEvents Added initial implementation of MessageConverter At the moment there seem that MessageConverter(s) would be the only thing needed to integrate Cloud Events with various elements of Spring --- .../catalog/SimpleFunctionRegistry.java | 7 +- .../CloudEventJsonMessageConverter.java | 64 +++++++++++++++++++ ...ntextFunctionCatalogAutoConfiguration.java | 2 +- .../context/config/JsonMessageConverter.java | 16 +++-- 4 files changed, 82 insertions(+), 7 deletions(-) create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventJsonMessageConverter.java diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index bc11e4a4b..5b2e91419 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -688,6 +688,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect inputValue = this.extractValueFromOriginalValueHolderIfNecessary(value); } + if (inputValue instanceof Message && !this.isInputTypeMessage()) { + inputValue = ((Message) inputValue).getPayload(); + } Object result = ((Function) this.target).apply(inputValue); return value instanceof OriginalMessageHolder @@ -985,7 +988,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect convertedInput = message; } else { - convertedInput = MessageBuilder.withPayload(convertedInput).copyHeaders(message.getHeaders()).build(); + if (!(convertedInput instanceof Message)) { + convertedInput = MessageBuilder.withPayload(convertedInput).copyHeaders(message.getHeaders()).build(); + } } } return convertedInput; diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventJsonMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventJsonMessageConverter.java new file mode 100644 index 000000000..dbd393b6d --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventJsonMessageConverter.java @@ -0,0 +1,64 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.config; + +import java.lang.reflect.Type; +import java.util.Collection; +import java.util.Map; + +import org.springframework.cloud.function.json.JsonMapper; +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.MimeType; + +/** + * Implementation of {@link MessageConverter} which uses Jackson or Gson libraries to do the + * actual conversion via {@link JsonMapper} instance. + * + * @author Oleg Zhurakousky + * + * @since 3.1.0 + */ +public class CloudEventJsonMessageConverter extends JsonMessageConverter { + + private final JsonMapper mapper; + + public CloudEventJsonMessageConverter(JsonMapper jsonMapper) { + super(jsonMapper, new MimeType("application", "cloudevents+json")); + this.mapper = jsonMapper; + } + + @Override + protected Object convertFromInternal(Message message, Class targetClass, @Nullable Object conversionHint) { + Type convertToType = conversionHint == null ? targetClass : (Type) conversionHint; + String jsonString = (String) message.getPayload(); + Map mapEvent = this.mapper.fromJson(jsonString, Map.class); + Object payload = this.mapper.fromJson(this.mapper.toJson(mapEvent.get("data")), convertToType); + mapEvent.remove("data"); + return MessageBuilder.withPayload(payload).copyHeaders(mapEvent).build(); + } + + @Override + protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers, + @Nullable Object conversionHint) { + throw new UnsupportedOperationException("Temporarily not supported as this converter is work in progress"); + } + +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index 8aaa824f6..66ac8e091 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -97,11 +97,11 @@ public class ContextFunctionCatalogAutoConfiguration { .collect(Collectors.toList()); mcList.add(new JsonMessageConverter(jsonMapper)); + mcList.add(new CloudEventJsonMessageConverter(jsonMapper)); mcList.add(new ByteArrayMessageConverter()); mcList.add(new StringMessageConverter()); mcList.add(new PrimitiveTypesFromStringMessageConverter(conversionService)); - if (!CollectionUtils.isEmpty(mcList)) { messageConverter = new SmartCompositeMessageConverter(mcList); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java index 05fe75cc1..cb7a16798 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java @@ -78,14 +78,20 @@ public class JsonMessageConverter extends AbstractMessageConverter { return message.getPayload(); } Type convertToType = conversionHint == null ? targetClass : (Type) conversionHint; - try { - return this.jsonMapper.fromJson(message.getPayload(), convertToType); + if (targetClass == byte[].class && message.getPayload() instanceof String) { + return ((String) message.getPayload()).getBytes(StandardCharsets.UTF_8); } - catch (Exception e) { - if (message.getPayload() instanceof byte[] && targetClass.isAssignableFrom(String.class)) { - return new String((byte[]) message.getPayload(), StandardCharsets.UTF_8); + else { + try { + return this.jsonMapper.fromJson(message.getPayload(), convertToType); + } + catch (Exception e) { + if (message.getPayload() instanceof byte[] && targetClass.isAssignableFrom(String.class)) { + return new String((byte[]) message.getPayload(), StandardCharsets.UTF_8); + } } } + return null; }