diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventDataContentTypeMessagePreProcessor.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventDataContentTypeMessagePreProcessor.java
new file mode 100644
index 000000000..e0998d6da
--- /dev/null
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventDataContentTypeMessagePreProcessor.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.cloudevent;
+
+import java.util.Map;
+import java.util.function.Function;
+
+import org.springframework.cloud.function.context.config.SmartCompositeMessageConverter;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.converter.CompositeMessageConverter;
+import org.springframework.messaging.converter.ContentTypeResolver;
+import org.springframework.messaging.converter.DefaultContentTypeResolver;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.util.Assert;
+import org.springframework.util.MimeType;
+import org.springframework.util.MimeTypeUtils;
+
+/**
+ * A Cloud Events specific pre-processor that is added to {@link SmartCompositeMessageConverter}
+ * to potentially modify incoming message.
+ *
+ * For Cloud Event coming in binary-mode such modification implies determining
+ * content type of the 'data' attribute (see {@link #getDataContentType(MessageHeaders)}
+ * of Cloud Event and creating a new {@link Message} with its `contentType` set to such
+ * content type while copying the rest of the headers.
+ *
+ * Similar to Cloud Event coming in binary-mode, the Cloud Event coming in structured-mode
+ * such modification also implies determining content type of the 'data' attribute
+ * (see {@link #getDataContentType(MessageHeaders)}...
+ *
+ * @author Oleg Zhurakousky
+ * @since 3.1
+ */
+public class CloudEventDataContentTypeMessagePreProcessor implements Function, Message>> {
+
+ private final ContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver();
+
+ private final MimeType cloudEventContentType = MimeTypeUtils.parseMimeType("application/cloudevents");
+
+ private final CompositeMessageConverter messageConverter;
+
+ public CloudEventDataContentTypeMessagePreProcessor(CompositeMessageConverter messageConverter) {
+ Assert.notNull(messageConverter, "'messageConverter' must not be null");
+ this.messageConverter = messageConverter;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Message> apply(Message> inputMessage) {
+ if (CloudEventUtils.isBinary(inputMessage)) {
+ String dataContentType = this.getDataContentType(inputMessage.getHeaders());
+ Message> message = MessageBuilder.fromMessage(inputMessage)
+ .setHeader(MessageHeaders.CONTENT_TYPE, dataContentType)
+// .setHeader("originalContentType", inputMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE)) not sure about it
+ .build();
+ return message;
+ }
+ else if (this.isStructured(inputMessage)) {
+ MimeType contentType = this.contentTypeResolver.resolve(inputMessage.getHeaders());
+ String dataContentType = this.getDataContentType(inputMessage.getHeaders());
+ String suffix = contentType.getSubtypeSuffix();
+ MimeType cloudEventDeserializationContentType = MimeTypeUtils
+ .parseMimeType(contentType.getType() + "/" + suffix);
+ Message> cloudEventMessage = MessageBuilder.fromMessage(inputMessage)
+ .setHeader(MessageHeaders.CONTENT_TYPE, cloudEventDeserializationContentType)
+ .setHeader(CloudEventUtils.CE_DATACONTENTTYPE, dataContentType).build();
+ Map structuredCloudEvent = (Map) this.messageConverter
+ .fromMessage(cloudEventMessage, Map.class);
+ Message> binaryCeMessage = this.buildCeMessageFromStructured(structuredCloudEvent);
+ return binaryCeMessage;
+ }
+ else {
+ return inputMessage;
+ }
+ }
+
+ private Message> buildCeMessageFromStructured(Map structuredCloudEvent) {
+ MessageBuilder> builder = MessageBuilder.withPayload(structuredCloudEvent.get(CloudEventUtils.DATA));
+ structuredCloudEvent.remove(CloudEventUtils.DATA);
+ builder.copyHeaders(structuredCloudEvent);
+ return builder.build();
+ }
+
+ private String getDataContentType(MessageHeaders headers) {
+ if (headers.containsKey(CloudEventUtils.DATACONTENTTYPE)) {
+ return (String) headers.get(CloudEventUtils.DATACONTENTTYPE);
+ }
+ else if (headers.containsKey(CloudEventUtils.CE_DATACONTENTTYPE)) {
+ return (String) headers.get(CloudEventUtils.CE_DATACONTENTTYPE);
+ }
+ else if (headers.containsKey(MessageHeaders.CONTENT_TYPE)) {
+ return headers.get(MessageHeaders.CONTENT_TYPE).toString();
+ }
+ return "application/json";
+ }
+
+ private boolean isStructured(Message> message) {
+ if (!CloudEventUtils.isBinary(message)) {
+ Map headers = message.getHeaders();
+
+ if (headers.containsKey(MessageHeaders.CONTENT_TYPE)) {
+ MimeType contentType = this.contentTypeResolver.resolve(message.getHeaders());
+ if (contentType.getType().equals(this.cloudEventContentType.getType())
+ && contentType.getSubtype().startsWith(this.cloudEventContentType.getSubtype())) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+}
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventJsonMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventJsonMessageConverter.java
new file mode 100644
index 000000000..1e8ed88ce
--- /dev/null
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventJsonMessageConverter.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.cloudevent;
+
+import org.springframework.cloud.function.context.config.JsonMessageConverter;
+import org.springframework.cloud.function.json.JsonMapper;
+import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.util.MimeType;
+
+/**
+ * Implementation of {@link MessageConverter} which uses Jackson or Gson libraries to do the
+ * actual conversion via {@link JsonMapper} instance.
+ *
+ * @author Oleg Zhurakousky
+ *
+ * @since 3.1
+ */
+public class CloudEventJsonMessageConverter extends JsonMessageConverter {
+
+ public CloudEventJsonMessageConverter(JsonMapper jsonMapper) {
+ super(jsonMapper, new MimeType("application", "cloudevents+json"));
+ this.setStrictContentTypeMatch(true);
+ }
+}
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventUtils.java
new file mode 100644
index 000000000..d972b1731
--- /dev/null
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventUtils.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.cloudevent;
+
+import java.util.Map;
+
+import org.springframework.messaging.Message;
+
+/**
+ * Miscellaneous utility methods to deal with Cloud Events - https://cloudevents.io/.
+ *
+ * Mainly for internal use within the framework;
+ *
+ * @author Oleg Zhurakousky
+ * @since 3.1
+ */
+public final class CloudEventUtils {
+
+ private CloudEventUtils() {
+
+ }
+
+ /**
+ * Prefix for attributes.
+ */
+ public static String ATTR_PREFIX = "ce_";
+
+ /**
+ * Value for 'data' attribute.
+ */
+ public static String DATA = "data";
+
+ /**
+ * Value for 'data' attribute with prefix.
+ */
+ public static String CE_DATA = ATTR_PREFIX + DATA;
+
+ /**
+ * Value for 'id' attribute.
+ */
+ public static String ID = "id";
+
+ /**
+ * Value for 'id' attribute with prefix.
+ */
+ public static String CE_ID = ATTR_PREFIX + ID;
+
+ /**
+ * Value for 'source' attribute.
+ */
+ public static String SOURCE = "source";
+
+ /**
+ * Value for 'source' attribute with prefix.
+ */
+ public static String CE_SOURCE = ATTR_PREFIX + SOURCE;
+
+ /**
+ * Value for 'specversion' attribute.
+ */
+ public static String SPECVERSION = "specversion";
+
+ /**
+ * Value for 'specversion' attribute with prefix.
+ */
+ public static String CE_SPECVERSION = ATTR_PREFIX + SPECVERSION;
+
+ /**
+ * Value for 'type' attribute.
+ */
+ public static String TYPE = "type";
+
+ /**
+ * Value for 'type' attribute with prefix.
+ */
+ public static String CE_TYPE = ATTR_PREFIX + TYPE;
+
+ /**
+ * Value for 'datacontenttype' attribute.
+ */
+ public static String DATACONTENTTYPE = "datacontenttype";
+
+ /**
+ * Value for 'datacontenttype' attribute with prefix.
+ */
+ public static String CE_DATACONTENTTYPE = ATTR_PREFIX + DATACONTENTTYPE;
+
+ /**
+ * Value for 'dataschema' attribute.
+ */
+ public static String DATASCHEMA = "dataschema";
+
+ /**
+ * Value for 'dataschema' attribute with prefix.
+ */
+ public static String CE_DATASCHEMA = ATTR_PREFIX + DATASCHEMA;
+
+ /**
+ * Value for 'subject' attribute.
+ */
+ public static String SUBJECT = "subject";
+
+ /**
+ * Value for 'subject' attribute with prefix.
+ */
+ public static String CE_SUBJECT = ATTR_PREFIX + SUBJECT;
+
+ /**
+ * Value for 'time' attribute.
+ */
+ public static String TIME = "time";
+
+ /**
+ * Value for 'time' attribute with prefix.
+ */
+ public static String CE_TIME = ATTR_PREFIX + TIME;
+
+ /**
+ * Checks if {@link Message} represents cloud event in binary-mode.
+ */
+ public static boolean isBinary(Message> message) {
+ Map headers = message.getHeaders();
+ return (headers.containsKey("id")
+ && headers.containsKey("source")
+ && headers.containsKey("specversion")
+ && headers.containsKey("type"))
+ ||
+ (headers.containsKey("ce_id")
+ && headers.containsKey("ce_source")
+ && headers.containsKey("ce_specversion")
+ && headers.containsKey("ce_type"));
+ }
+}
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java
index 7d69c4e9a..d95e0b48d 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java
@@ -16,7 +16,6 @@
package org.springframework.cloud.function.context.catalog;
-import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventJsonMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventJsonMessageConverter.java
deleted file mode 100644
index 73049b73c..000000000
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventJsonMessageConverter.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright 2020-2020 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.cloud.function.context.config;
-
-import java.lang.reflect.Type;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.Map;
-
-import org.springframework.cloud.function.json.JsonMapper;
-import org.springframework.lang.Nullable;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.converter.MessageConverter;
-import org.springframework.messaging.support.MessageBuilder;
-import org.springframework.util.MimeType;
-
-/**
- * Implementation of {@link MessageConverter} which uses Jackson or Gson libraries to do the
- * actual conversion via {@link JsonMapper} instance.
- *
- * @author Oleg Zhurakousky
- *
- * @since 3.1
- */
-public class CloudEventJsonMessageConverter extends JsonMessageConverter {
-
- private final JsonMapper mapper;
-
- public CloudEventJsonMessageConverter(JsonMapper jsonMapper) {
- super(jsonMapper, new MimeType("application", "cloudevents+json"));
- this.mapper = jsonMapper;
- }
-
- @Override
- protected Object convertFromInternal(Message> message, Class> targetClass, @Nullable Object conversionHint) {
- if (this.isBinary(message)) {
- return super.convertFromInternal(message, targetClass, conversionHint);
- }
- else {
- if (targetClass.isInstance(message.getPayload()) && !(message.getPayload() instanceof Collection>)) {
- return message.getPayload();
- }
- Type convertToType = conversionHint == null ? targetClass : (Type) conversionHint;
- String jsonString = message.getPayload() instanceof String
- ? (String) message.getPayload()
- : new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
- Map mapEvent = this.mapper.fromJson(jsonString, Map.class);
- Object payload = this.mapper.fromJson(this.mapper.toJson(mapEvent.get("data")), convertToType);
- mapEvent.remove("data");
- return MessageBuilder.withPayload(payload).copyHeaders(mapEvent).build();
- }
- }
-
- private boolean isBinary(Message> message) {
- Map headers = message.getHeaders();
- return headers.containsKey("source") && headers.containsKey("specversion") && headers.containsKey("type");
- }
-}
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java
index 66ac8e091..02fd37def 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java
@@ -30,6 +30,8 @@ import com.google.gson.Gson;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.cloud.function.cloudevent.CloudEventDataContentTypeMessagePreProcessor;
+import org.springframework.cloud.function.cloudevent.CloudEventJsonMessageConverter;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.FunctionRegistry;
@@ -78,7 +80,7 @@ public class ContextFunctionCatalogAutoConfiguration {
conversionService.addConverter(converter);
}
- CompositeMessageConverter messageConverter = null;
+ SmartCompositeMessageConverter messageConverter = null;
List mcList = new ArrayList<>();
if (!CollectionUtils.isEmpty(messageConverters)) {
@@ -104,6 +106,8 @@ public class ContextFunctionCatalogAutoConfiguration {
if (!CollectionUtils.isEmpty(mcList)) {
messageConverter = new SmartCompositeMessageConverter(mcList);
+ CloudEventDataContentTypeMessagePreProcessor messagePreProcessor = new CloudEventDataContentTypeMessagePreProcessor(messageConverter);
+ messageConverter.setMessagePreProcessor(messagePreProcessor);
}
return new BeanFactoryAwareFunctionRegistry(conversionService, messageConverter, jsonMapper);
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java
index 00288a85e..2b256b533 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java
@@ -18,6 +18,7 @@ package org.springframework.cloud.function.context.config;
import java.util.Collection;
import java.util.List;
+import java.util.function.Function;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
@@ -37,6 +38,8 @@ import org.springframework.util.StringUtils;
*/
public class SmartCompositeMessageConverter extends CompositeMessageConverter {
+ private Function, Message>> preProcessor;
+
public SmartCompositeMessageConverter(Collection converters) {
super(converters);
}
@@ -44,6 +47,9 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter {
@Override
@Nullable
public Object fromMessage(Message> message, Class> targetClass) {
+ if (this.preProcessor != null) {
+ message = this.preProcessor.apply(message);
+ }
for (MessageConverter converter : getConverters()) {
Object result = converter.fromMessage(message, targetClass);
if (result != null) {
@@ -56,6 +62,9 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter {
@Override
@Nullable
public Object fromMessage(Message> message, Class> targetClass, @Nullable Object conversionHint) {
+ if (this.preProcessor != null) {
+ message = this.preProcessor.apply(message);
+ }
for (MessageConverter converter : getConverters()) {
Object result = (converter instanceof SmartMessageConverter ?
((SmartMessageConverter) converter).fromMessage(message, targetClass, conversionHint) :
@@ -133,4 +142,8 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter {
}
return null;
}
+
+ public void setMessagePreProcessor(Function, Message>> preProcessor) {
+ this.preProcessor = preProcessor;
+ }
}
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JacksonMapper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JacksonMapper.java
index d2dff9a64..c8d1f9f5f 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JacksonMapper.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JacksonMapper.java
@@ -18,6 +18,7 @@ package org.springframework.cloud.function.json;
import java.io.Reader;
import java.lang.reflect.Type;
+import java.util.Map;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JavaType;
@@ -56,6 +57,9 @@ public class JacksonMapper extends JsonMapper {
else if (json instanceof Reader) {
convertedValue = this.mapper.readValue((Reader) json, constructType);
}
+ else if (json instanceof Map) {
+ convertedValue = this.mapper.convertValue(json, constructType);
+ }
}
catch (Exception e) {
throw new IllegalStateException("Failed to convert. Possible bug as the conversion probably shouldn't have been attampted here", e);
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JsonMapper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JsonMapper.java
index e6f0cdb60..dfaf1382d 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JsonMapper.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JsonMapper.java
@@ -16,6 +16,7 @@
package org.springframework.cloud.function.json;
+import java.io.Reader;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -77,6 +78,15 @@ public abstract class JsonMapper {
return (T) results;
}
else {
+ if (!(json instanceof String) && !(json instanceof byte[]) && !(json instanceof Reader)) {
+ json = this.toJson(json);
+ if (FunctionTypeUtils.getRawType(type) == String.class) {
+ return (T) new String((byte[]) json, StandardCharsets.UTF_8);
+ }
+ else if (FunctionTypeUtils.getRawType(type) == byte[].class) {
+ return (T) json;
+ }
+ }
return this.doFromJson(json, type);
}
}
diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventJsonMessageConverterTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventJsonMessageConverterTests.java
new file mode 100644
index 000000000..558e21192
--- /dev/null
+++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventJsonMessageConverterTests.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2019-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.cloudevent;
+
+import java.lang.reflect.Field;
+import java.util.UUID;
+
+import org.junit.jupiter.api.Test;
+
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.builder.SpringApplicationBuilder;
+import org.springframework.cloud.function.context.FunctionCatalog;
+import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry;
+import org.springframework.cloud.function.context.config.SmartCompositeMessageConverter;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.util.MimeTypeUtils;
+import org.springframework.util.ReflectionUtils;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ *
+ * @author Oleg Zhurakousky
+ *
+ */
+public class CloudEventJsonMessageConverterTests {
+ @Test
+ public void testFromMessageBinaryPayloadMatchesType() {
+ SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
+ Message message = MessageBuilder.withPayload("Hello Ricky").setHeader("ce_source", "https://spring.io/")
+ .setHeader("ce_id", UUID.randomUUID().toString()).setHeader("ce_type", "org.springframework")
+ .setHeader("ce_specversion", "1.0").setHeader("ce_datacontenttype", "text/plain").build();
+
+ String converted = (String) messageConverter.fromMessage(message, String.class);
+ assertThat(converted).isEqualTo("Hello Ricky");
+ }
+
+ @Test
+ public void testFromMessageBinaryPayloadDoesNotMatchType() {
+ SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
+ Message message = MessageBuilder.withPayload("Hello Ricky".getBytes())
+ .setHeader(MessageHeaders.CONTENT_TYPE,
+ MimeTypeUtils.parseMimeType("application/cloudevents+json;charset=utf-8"))
+ .setHeader("ce_source", "https://spring.io/").setHeader("ce_id", UUID.randomUUID().toString())
+ .setHeader("ce_type", "org.springframework").setHeader("ce_specversion", "1.0")
+ .setHeader("ce_datacontenttype", "text/plain").build();
+ String converted = (String) messageConverter.fromMessage(message, String.class);
+ assertThat(converted).isEqualTo("Hello Ricky");
+ }
+
+ @Test // JsonMessageConverter does some special things between byte[] and String so
+ // this works
+ public void testFromMessageBinaryPayloadNoDataContentTypeToString() {
+ SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
+ Message message = MessageBuilder.withPayload("Hello Ricky".getBytes())
+ .setHeader(MessageHeaders.CONTENT_TYPE,
+ MimeTypeUtils.parseMimeType("application/cloudevents+json;charset=utf-8"))
+ .setHeader("ce_source", "https://spring.io/").setHeader("ce_id", UUID.randomUUID().toString())
+ .setHeader("ce_type", "org.springframework").setHeader("ce_specversion", "1.0").build();
+ String converted = (String) messageConverter.fromMessage(message, String.class);
+ assertThat(converted).isEqualTo("Hello Ricky");
+ }
+
+ @Test // Unlike the previous test the type here is POJO so no special treatement
+ public void testFromMessageBinaryPayloadNoDataContentTypeToPOJO() {
+ SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
+ Message message = MessageBuilder.withPayload("Hello Ricky".getBytes())
+ .setHeader(MessageHeaders.CONTENT_TYPE,
+ MimeTypeUtils.parseMimeType("application/cloudevents+json;charset=utf-8"))
+ .setHeader("ce_source", "https://spring.io/").setHeader("ce_id", UUID.randomUUID().toString())
+ .setHeader("ce_type", "org.springframework").setHeader("ce_specversion", "1.0").build();
+ String converted = (String) messageConverter.fromMessage(message, Person.class);
+ assertThat(converted).isNull();
+ }
+
+ @Test // will fall on default CT which is json
+ public void testFromMessageBinaryPayloadNoDataContentTypeToPOJOThatWorks() {
+ SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
+ Message message = MessageBuilder.withPayload("{\"name\":\"Ricky\"}".getBytes())
+ .setHeader(MessageHeaders.CONTENT_TYPE,
+ MimeTypeUtils.parseMimeType("application/cloudevents+json;charset=utf-8"))
+ .setHeader("ce_source", "https://spring.io/").setHeader("ce_id", UUID.randomUUID().toString())
+ .setHeader("ce_type", "org.springframework").setHeader("ce_specversion", "1.0").build();
+ Person converted = (Person) messageConverter.fromMessage(message, Person.class);
+ assertThat(converted.getName()).isEqualTo("Ricky");
+ }
+
+ @Test // will fall on default CT which is json
+ public void testFromMessageStructured() {
+ String cloudEventStructured = "{\n" + " \"specversion\" : \"1.0\",\n"
+ + " \"type\" : \"org.springframework\",\n" + " \"source\" : \"https://spring.io/\",\n"
+ + " \"id\" : \"A234-1234-1234\",\n" + " \"datacontenttype\" : \"application/json\",\n"
+ + " \"data\" : {\n" + " \"version\" : \"1.0\",\n"
+ + " \"releaseName\" : \"Spring Framework\",\n" + " \"releaseDate\" : \"24-03-2004\"\n"
+ + " }\n" + " }";
+ SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
+ Message message = MessageBuilder.withPayload(cloudEventStructured)
+ .setHeader(MessageHeaders.CONTENT_TYPE,
+ MimeTypeUtils.parseMimeType("application/cloudevents+json;charset=utf-8"))
+ .setHeader("ce_datacontenttype", "application/json").build();
+ SpringReleaseEvent springReleaseEvent = (SpringReleaseEvent) messageConverter.fromMessage(message,
+ SpringReleaseEvent.class);
+ assertThat(springReleaseEvent.getReleaseName()).isEqualTo("Spring Framework");
+ assertThat(springReleaseEvent.getVersion()).isEqualTo("1.0");
+ }
+
+ private SmartCompositeMessageConverter configure(Class>... configClass) {
+ ApplicationContext context = new SpringApplicationBuilder(configClass).run(
+ "--logging.level.org.springframework.cloud.function=DEBUG", "--spring.main.lazy-initialization=true");
+ FunctionCatalog catalog = context.getBean(FunctionCatalog.class);
+ Field f = ReflectionUtils.findField(BeanFactoryAwareFunctionRegistry.class, "messageConverter");
+ f.setAccessible(true);
+ try {
+ SmartCompositeMessageConverter messageConverter = (SmartCompositeMessageConverter) f.get(catalog);
+ return messageConverter;
+ }
+ catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @EnableAutoConfiguration
+ @Configuration
+ public static class DummyConfiguration {
+ }
+
+ public static class Person {
+ private String name;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+ }
+}
diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/SpringReleaseEvent.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/SpringReleaseEvent.java
new file mode 100644
index 000000000..6168b7a50
--- /dev/null
+++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/SpringReleaseEvent.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.cloudevent;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+
+/**
+ * An example POJO that represents cloud event data
+ *
+ * @author Oleg Zhurakousky
+ *
+ */
+public class SpringReleaseEvent {
+
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy")
+ private Date releaseDate;
+
+ private String releaseName;
+
+ private String version;
+
+ public Date getReleaseDate() {
+ return releaseDate;
+ }
+
+ public void setReleaseDate(Date releaseDate) {
+ this.releaseDate = releaseDate;
+ }
+
+ public String getReleaseName() {
+ return releaseName;
+ }
+
+ public void setReleaseName(String releaseName) {
+ this.releaseName = releaseName;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+ @Override
+ public String toString() {
+ return "releaseDate:" + new SimpleDateFormat("dd-MM-yyyy").format(releaseDate) + "; releaseName:" + releaseName + "; version:" + version;
+ }
+}
diff --git a/spring-cloud-function-samples/function-sample-cloudevent/README.adoc b/spring-cloud-function-samples/function-sample-cloudevent/README.adoc
index 470dbdc54..68ad52aa2 100644
--- a/spring-cloud-function-samples/function-sample-cloudevent/README.adoc
+++ b/spring-cloud-function-samples/function-sample-cloudevent/README.adoc
@@ -38,7 +38,24 @@ provides a good example on how to accomplish this.
### Function as a REST endpoint
Given that SCF allows function to be exposed as REST endpoints, you can post cloud event to any of the
-functions by using function name as path (e.g., `localhost:8080/`)
+functions by using function name as path (e.g., `localhost:8080/`).
+
+Just add this to your dependency
+
+[source, xml]
+----
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.springframework.cloud
+ spring-cloud-function-web
+ 3.1.0-SNAPSHOT
+
+
+----
Here is an example of curl command posting a cloud event in binary-mode:
diff --git a/spring-cloud-function-samples/function-sample-cloudevent/pom.xml b/spring-cloud-function-samples/function-sample-cloudevent/pom.xml
index 4076b43a1..0633844b5 100644
--- a/spring-cloud-function-samples/function-sample-cloudevent/pom.xml
+++ b/spring-cloud-function-samples/function-sample-cloudevent/pom.xml
@@ -2,28 +2,27 @@
4.0.0
-
- org.springframework.boot
- spring-boot-starter-parent
- 2.3.5.RELEASE
-
-
io.spring.sample
function-sample-cloudevent
0.0.1-SNAPSHOT
function-sample-cloudevent
Demo project for Spring Boot
+
+
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.4.0-RC1
+
+
- 8
+ 1.8
+ 3.1.0-SNAPSHOT
+ 1.0.21.RELEASE
-
- org.springframework.boot
- spring-boot-starter
-
-
org.springframework.boot
@@ -32,7 +31,7 @@
org.springframework.cloud
spring-cloud-function-web
- 3.1.0-SNAPSHOT
+
@@ -59,8 +58,7 @@
-
-
+
org.springframework.boot
spring-boot-starter-test
test
@@ -73,13 +71,112 @@
+
+
+
+ org.springframework.cloud
+ spring-cloud-function-dependencies
+ ${spring-cloud-function.version}
+ pom
+ import
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+
+ true
+
+
org.springframework.boot
spring-boot-maven-plugin
+
+
+ org.springframework.boot.experimental
+ spring-boot-thin-layout
+ ${wrapper.version}
+
+
+
+
+ maven-surefire-plugin
+
+
+ **/*Tests.java
+ **/*Test.java
+
+
+ **/Abstract*.java
+
+
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/libs-snapshot-local
+
+ true
+
+
+ false
+
+
+
+ spring-milestones
+ Spring Milestones
+ https://repo.spring.io/libs-milestone-local
+
+ false
+
+
+
+ spring-releases
+ Spring Releases
+ https://repo.spring.io/release
+
+ false
+
+
+
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/libs-snapshot-local
+
+ true
+
+
+ false
+
+
+
+ spring-milestones
+ Spring Milestones
+ https://repo.spring.io/libs-milestone-local
+
+ false
+
+
+
+ spring-releases
+ Spring Releases
+ https://repo.spring.io/libs-release-local
+
+ false
+
+
+
+
+
diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java
index f75c8d2da..1303158fd 100644
--- a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java
+++ b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java
@@ -27,7 +27,7 @@ import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.web.client.TestRestTemplate;
-import org.springframework.cloud.function.context.config.CloudEventJsonMessageConverter;
+import org.springframework.cloud.function.cloudevent.CloudEventJsonMessageConverter;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -39,7 +39,9 @@ import org.springframework.http.ResponseEntity;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.converter.AbstractMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.util.MimeType;
import org.springframework.util.SocketUtils;
/**
@@ -190,12 +192,12 @@ public class CloudeventDemoApplicationRESTTests {
RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asStringMessage"));
ResponseEntity response = testRestTemplate.exchange(re, String.class);
- assertThat(response.getBody()).isEqualTo(payload);
+ assertThat(response.getBody()).isEqualTo("{\"version\":\"1.0\",\"releaseName\":\"Spring Framework\",\"releaseDate\":\"24-03-2004\"}");
re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asString"));
response = testRestTemplate.exchange(re, String.class);
- assertThat(response.getBody()).isEqualTo(payload);
+ assertThat(response.getBody()).isEqualTo("{\"version\":\"1.0\",\"releaseName\":\"Spring Framework\",\"releaseDate\":\"24-03-2004\"}");
}
@@ -207,10 +209,10 @@ public class CloudeventDemoApplicationRESTTests {
}
}
- public static class FooBarToCloudEventMessageConverter extends CloudEventJsonMessageConverter {
+ public static class FooBarToCloudEventMessageConverter extends AbstractMessageConverter {
public FooBarToCloudEventMessageConverter(JsonMapper jsonMapper) {
- super(jsonMapper);
+ super(new MimeType("foo", "bar"));
}
@Override