A getAttribute(String attrName) {
- if (this.containsKey(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attrName)) {
- return (A) this.get(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attrName);
- }
- else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attrName)) {
- return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attrName);
- }
- return (A) this.get(attrName);
- }
-
- /**
- * Determines if this instance of {@link CloudEventAttributes} represents valid Cloud Event.
- * This implies that it contains all 4 required attributes (id, source, type & specversion)
- *
- * @return true if this instance represents a valid Cloud Event
- */
- public boolean isValidCloudEvent() {
- return StringUtils.hasText(this.getId())
- && StringUtils.hasText(this.getSource())
- && StringUtils.hasText(this.getSpecversion())
- && StringUtils.hasText(this.getType());
- }
-
- String getAttributeName(String attributeName) {
- if (this.containsKey(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attributeName)) {
- return CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attributeName;
- }
- else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attributeName)) {
- return CloudEventMessageUtils.HTTP_ATTR_PREFIX + attributeName;
- }
- return attributeName;
- }
-
- private CloudEventAttributes setAttribute(String attrName, String attrValue) {
- if (StringUtils.hasText(this.prefixToUse)) {
- this.remove(this.getAttributeName(attrName));
- this.put(this.prefixToUse + attrName, attrValue);
- }
- else {
- this.put(this.getAttributeName(attrName), attrValue);
- }
- return this;
- }
-}
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java
deleted file mode 100644
index 721dd91d2..000000000
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2020-2020 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.cloud.function.cloudevent;
-
-
-/**
- * Strategy that should be implemented by the user to help with outgoing Cloud Event attributes.
- *
- * The provided `attributes` are already initialized with default values, so you can only set the ones that you need.
- *
- * Once implemented, simply configure it as a bean and the framework will invoke it before the outbound Cloud Event Message is finalized.
- *
- * {@code
- * @Bean
- * public CloudEventAttributesProvider cloudEventAttributesProvider() {
- * return attributes -> {
- * attributes.setSource("https://interface21.com/").setType("com.interface21");
- * };
- * }}
- *
- *
- * @author Oleg Zhurakousky
- * @author Dave Syer
- *
- * @since 3.1
- */
-@FunctionalInterface
-public interface CloudEventAttributesProvider {
- /**
- *
- * @param attributes instance of {@link CloudEventAttributes}
- */
- void generateDefaultCloudEventHeaders(CloudEventAttributes attributes);
-}
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventHeaderEnricher.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventHeaderEnricher.java
new file mode 100644
index 000000000..485fef3a1
--- /dev/null
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventHeaderEnricher.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.cloudevent;
+
+
+
+/**
+ * Strategy that should be implemented by the user to help with outgoing Cloud Event
+ * headers.
+ *
+ * NOTE: The provided instance of {@link CloudEventMessageBuilder} may or may not be initialized
+ * with default values, so it is the responsibility of the user to ensure that all required Cloud Events
+ * attributes are set. That said, Spring frameworks which utilize this interface
+ * will ensure that the provided {@link CloudEventMessageBuilder} is initialized with default values, leaving
+ * you responsible to only set the attributes you need.
+ * Once implemented, simply configure it as a bean and the framework will invoke it before
+ * the outbound Cloud Event Message is finalized.
+ *
+ *
+ * @Bean
+ * public CloudEventHeadersProvider cloudEventHeadersProvider() {
+ * return attributes ->
+ * CloudEventHeaderUtils.fromAttributes(attributes).withSource("https://interface21.com/").withType("com.interface21").build();
+ * }
+ *
+ *
+ * @author Oleg Zhurakousky
+ * @author Dave Syer
+ * @since 2.0
+ */
+@FunctionalInterface
+public interface CloudEventHeaderEnricher {
+
+ /**
+ * @param attributes instance of {@link CloudEventContext}
+ * @return modified {@link CloudEventContext}
+ */
+ CloudEventMessageBuilder> enrich(CloudEventMessageBuilder> messageBuilder);
+
+}
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java
new file mode 100644
index 000000000..0d2fba37d
--- /dev/null
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.cloudevent;
+
+import java.net.URI;
+import java.time.OffsetTime;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.support.GenericMessage;
+
+/**
+ * Message builder which is aware of Cloud Event semantics.
+ *
+ * @author Oleg Zhurakousky
+ * @since 3.1
+ */
+public final class CloudEventMessageBuilder {
+
+ protected Log logger = LogFactory.getLog(this.getClass());
+
+ private final Map headers;
+
+ private T data;
+
+ private CloudEventMessageBuilder(Map headers) {
+ this.headers = headers == null ? new HashMap<>() : headers;
+ }
+
+ public static CloudEventMessageBuilder withData(T data) {
+ CloudEventMessageBuilder builder = new CloudEventMessageBuilder(null);
+ builder.data = data;
+ return builder;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static CloudEventMessageBuilder fromMessage(Message> message) {
+ CloudEventMessageBuilder builder = new CloudEventMessageBuilder(new HashMap<>(message.getHeaders()));
+ builder.data = (T) message.getPayload();
+ return builder;
+ }
+
+ public CloudEventMessageBuilder setId(String id) {
+ this.headers.put(CloudEventMessageUtils.ID, id);
+ return this;
+ }
+
+ public CloudEventMessageBuilder setSource(URI uri) {
+ this.headers.put(CloudEventMessageUtils.SOURCE, uri);
+ return this;
+ }
+
+ public CloudEventMessageBuilder setSource(String uri) {
+ this.headers.put(CloudEventMessageUtils.SOURCE, URI.create(uri));
+ return this;
+ }
+
+ public CloudEventMessageBuilder setSpecVersion(String specversion) {
+ this.headers.put(CloudEventMessageUtils.SPECVERSION, specversion);
+ return this;
+ }
+
+ public CloudEventMessageBuilder setType(String type) {
+ this.headers.put(CloudEventMessageUtils.TYPE, type);
+ return this;
+ }
+
+ public CloudEventMessageBuilder setDataContentType(String dataContentType) {
+ this.headers.put(CloudEventMessageUtils.DATACONTENTTYPE, dataContentType);
+ return this;
+ }
+
+ public CloudEventMessageBuilder setDataSchema(URI dataSchema) {
+ this.headers.put(CloudEventMessageUtils.DATASCHEMA, dataSchema);
+ return this;
+ }
+
+ public CloudEventMessageBuilder setDataSchema(String dataSchema) {
+ this.headers.put(CloudEventMessageUtils.DATASCHEMA, URI.create(dataSchema));
+ return this;
+ }
+
+ public CloudEventMessageBuilder setSubject(String subject) {
+ this.headers.put(CloudEventMessageUtils.SUBJECT, subject);
+ return this;
+ }
+
+ public CloudEventMessageBuilder copyHeaders(Map headers) {
+ this.headers.putAll(headers);
+ return this;
+ }
+
+ public CloudEventMessageBuilder setTime(OffsetTime time) {
+ this.headers.put(CloudEventMessageUtils.TIME, time);
+ return this;
+ }
+
+ public CloudEventMessageBuilder setTime(String time) {
+ this.headers.put(CloudEventMessageUtils.TIME, OffsetTime.parse(time));
+ return this;
+ }
+
+ public CloudEventMessageBuilder setHeader(String key, Object value) {
+ this.headers.put(key, value);
+ return this;
+ }
+
+ /**
+ * Returns a snapshot of the headers {@link Map} at the time this method is called.
+ * The returned Map is read-only.
+ *
+ * @return map of headers
+ */
+ public Map toHeadersMap() {
+ return Collections.unmodifiableMap(this.headers);
+ }
+
+ public Message build() {
+ if (!this.headers.containsKey(CloudEventMessageUtils.SPECVERSION)) {
+ this.headers.put(CloudEventMessageUtils.SPECVERSION, "1.0");
+ }
+ return this.doBuild();
+ }
+
+ public Message build(String attributePrefixToUse) {
+ String[] keys = this.headers.keySet().toArray(new String[] {});
+ for (String key : keys) {
+ Object value = this.headers.remove(key);
+ this.headers.put(attributePrefixToUse + key, value);
+ }
+ if (!this.headers.containsKey(attributePrefixToUse + CloudEventMessageUtils.SPECVERSION)) {
+ this.headers.put(attributePrefixToUse + CloudEventMessageUtils.SPECVERSION, "1.0");
+ }
+ return build();
+ }
+
+ private Message doBuild() {
+ this.headers.put("message-type", "cloudevent");
+ CloudEventMessageHeaders headers = new CloudEventMessageHeaders(this.headers, this.getUUID(), null);
+ GenericMessage message = new GenericMessage(data, headers);
+ return message;
+ }
+
+ private UUID getUUID() {
+ UUID id = null;
+ if (this.headers.containsKey(CloudEventMessageUtils.ID)) {
+ String stringId = this.headers.get(CloudEventMessageUtils.ID).toString();
+ try {
+ id = UUID.fromString(stringId);
+ System.out.println(stringId);
+ System.out.println(id.toString());
+ }
+ catch (Exception e) {
+ logger.info("Provided Cloud Event 'id' is not compatible with Message 'id' which is UUID, "
+ + "therefore Cloud Event 'id' will be written as '_id' message header");
+ this.headers.put("_" + CloudEventMessageUtils.ID, stringId);
+ this.headers.remove(CloudEventMessageUtils.ID);
+ }
+ }
+ return id;
+ }
+
+ private static class CloudEventMessageHeaders extends MessageHeaders {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -6424866731588545945L;
+
+ protected CloudEventMessageHeaders(Map headers, UUID id, Long timestamp) {
+ super(headers, id, timestamp);
+ }
+
+ }
+}
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java
index 36843c4d3..e45cb1f85 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java
@@ -16,10 +16,12 @@
package org.springframework.cloud.function.cloudevent;
-import java.util.HashMap;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.time.OffsetTime;
+import java.util.Collections;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
@@ -27,19 +29,19 @@ import org.springframework.messaging.converter.ContentTypeResolver;
import org.springframework.messaging.converter.DefaultContentTypeResolver;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
-import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
+import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
/**
- * Miscellaneous utility methods to deal with Cloud Events - https://cloudevents.io/.
- *
- * Primarily intended for the internal use within the framework;
+ * Miscellaneous utility methods to assist with representing Cloud Event as Spring
+ * {@link Message}.
+ * Primarily intended for the internal use within Spring-based frameworks and
+ * integrations.
*
* @author Oleg Zhurakousky
* @author Dave Syer
- *
* @since 3.1
*/
public final class CloudEventMessageUtils {
@@ -61,280 +63,254 @@ public final class CloudEventMessageUtils {
public static MimeType APPLICATION_CLOUDEVENTS = MimeTypeUtils.parseMimeType(APPLICATION_CLOUDEVENTS_VALUE);
/**
- * Default attributes prefix which also suits Kafka.
+ * Prefix for attributes.
*/
public static String DEFAULT_ATTR_PREFIX = "ce_";
- /**
- * HTTP attributes prefix.
- */
- public static String HTTP_ATTR_PREFIX = "ce-";
-
/**
* AMQP attributes prefix.
*/
public static String AMQP_ATTR_PREFIX = "cloudEvents:";
+ /**
+ * Prefix for attributes.
+ */
+ public static String HTTP_ATTR_PREFIX = "ce-";
+
/**
* Value for 'data' attribute.
*/
public static String DATA = "data";
- /**
- * Value for 'data' attribute with prefix.
- */
- public static String CANONICAL_DATA = DEFAULT_ATTR_PREFIX + DATA;
-
/**
* Value for 'id' attribute.
*/
public static String ID = "id";
- /**
- * Value for 'id' attribute with prefix.
- */
- public static String CANONICAL_ID = DEFAULT_ATTR_PREFIX + ID;
-
/**
* Value for 'source' attribute.
*/
public static String SOURCE = "source";
- /**
- * Value for 'source' attribute with prefix.
- */
- public static String CANONICAL_SOURCE = DEFAULT_ATTR_PREFIX + SOURCE;
-
/**
* Value for 'specversion' attribute.
*/
public static String SPECVERSION = "specversion";
- /**
- * Value for 'specversion' attribute with prefix.
- */
- public static String CANONICAL_SPECVERSION = DEFAULT_ATTR_PREFIX + SPECVERSION;
-
/**
* Value for 'type' attribute.
*/
public static String TYPE = "type";
- /**
- * Value for 'type' attribute with prefix.
- */
- public static String CANONICAL_TYPE = DEFAULT_ATTR_PREFIX + TYPE;
-
/**
* Value for 'datacontenttype' attribute.
*/
public static String DATACONTENTTYPE = "datacontenttype";
- /**
- * Value for 'datacontenttype' attribute with prefix.
- */
- public static String CANONICAL_DATACONTENTTYPE = DEFAULT_ATTR_PREFIX + DATACONTENTTYPE;
-
/**
* Value for 'dataschema' attribute.
*/
public static String DATASCHEMA = "dataschema";
/**
- * Value for 'dataschema' attribute with prefix.
+ * V03 name for 'dataschema' attribute.
*/
- public static String CANONICAL_DATASCHEMA = DEFAULT_ATTR_PREFIX + DATASCHEMA;
+ public static final String SCHEMAURL = "schemaurl";
/**
* Value for 'subject' attribute.
*/
public static String SUBJECT = "subject";
- /**
- * Value for 'subject' attribute with prefix.
- */
- public static String CANONICAL_SUBJECT = DEFAULT_ATTR_PREFIX + SUBJECT;
-
/**
* Value for 'time' attribute.
*/
public static String TIME = "time";
- /**
- * Value for 'time' attribute with prefix.
- */
- public static String CANONICAL_TIME = DEFAULT_ATTR_PREFIX + TIME;
-
- /**
- * Checks if {@link Message} represents cloud event in binary-mode.
- */
- public static boolean isBinary(Map headers) {
- CloudEventAttributes attributes = new CloudEventAttributes(headers);
- return attributes.isValidCloudEvent();
+ public static String getId(Message> message) {
+ if (message.getHeaders().containsKey("_id")) {
+ return (String) message.getHeaders().get("_id");
+ }
+ String prefix = determinePrefixToUse(message.getHeaders());
+ return (String) message.getHeaders().get(prefix + MessageHeaders.ID);
}
- /**
- * Will construct instance of {@link CloudEventAttributes} setting its required attributes.
- *
- * @param ce_id value for Cloud Event 'id' attribute
- * @param ce_specversion value for Cloud Event 'specversion' attribute
- * @param ce_source value for Cloud Event 'source' attribute
- * @param ce_type value for Cloud Event 'type' attribute
- * @return instance of {@link CloudEventAttributes}
- */
- public static CloudEventAttributes get(String ce_id, String ce_specversion, String ce_source, String ce_type) {
- Assert.hasText(ce_id, "'ce_id' must not be null or empty");
- Assert.hasText(ce_specversion, "'ce_specversion' must not be null or empty");
- Assert.hasText(ce_source, "'ce_source' must not be null or empty");
- Assert.hasText(ce_type, "'ce_type' must not be null or empty");
- Map requiredAttributes = new HashMap<>();
- requiredAttributes.put(CloudEventMessageUtils.CANONICAL_ID, ce_id);
- requiredAttributes.put(CloudEventMessageUtils.CANONICAL_SPECVERSION, ce_specversion);
- requiredAttributes.put(CloudEventMessageUtils.CANONICAL_SOURCE, ce_source);
- requiredAttributes.put(CloudEventMessageUtils.CANONICAL_TYPE, ce_type);
- return new CloudEventAttributes(requiredAttributes);
+ public static URI getSource(Message> message) {
+ String prefix = determinePrefixToUse(message.getHeaders());
+ return safeGetURI(message.getHeaders(), prefix + SOURCE);
}
- /**
- * Will construct instance of {@link CloudEventAttributes}
- * Should default/generate cloud event ID and SPECVERSION.
- *
- * @param ce_source value for Cloud Event 'source' attribute
- * @param ce_type value for Cloud Event 'type' attribute
- * @return instance of {@link CloudEventAttributes}
- */
- public static CloudEventAttributes get(String ce_source, String ce_type) {
- return get(UUID.randomUUID().toString(), "1.0", ce_source, ce_type);
+ public static String getSpecVersion(Message> message) {
+ String prefix = determinePrefixToUse(message.getHeaders());
+ return (String) message.getHeaders().get(prefix + SPECVERSION);
+ }
+
+ public static String getType(Message> message) {
+ String prefix = determinePrefixToUse(message.getHeaders());
+ return (String) message.getHeaders().get(prefix + TYPE);
+ }
+
+ public static String getDataContentType(Message> message) {
+ String prefix = determinePrefixToUse(message.getHeaders());
+ return (String) message.getHeaders().get(prefix + DATACONTENTTYPE);
+ }
+
+ public static URI getDataSchema(Message> message) {
+ String prefix = determinePrefixToUse(message.getHeaders());
+ return safeGetURI(message.getHeaders(), prefix + DATASCHEMA);
+ }
+
+ public static String getSubject(Message> message) {
+ String prefix = determinePrefixToUse(message.getHeaders());
+ return (String) message.getHeaders().get(prefix + SUBJECT);
+ }
+
+ public static OffsetTime getTime(Message> message) {
+ String prefix = determinePrefixToUse(message.getHeaders());
+ return (OffsetTime) message.getHeaders().get(prefix + TIME);
}
- /**
- * Will attempt to convert 'inputMessage' to a binary-mode Cloud Event {@link Message}.
- * This typically happens when 'inputMessage' represents Cloud Event in structured-mode.
- *
- * In the event the message already represents Cloud Event in binary-mode, or this
- * message does not represent Cloud Event at all, it will be returned unchanged.
- *
- * @param inputMessage instance of incoming {@link Message}
- * @param messageConverter instance of {@link MessageConverter} to assist with type conversion.
- * @return instance of {@link Message} representing Cloud Event in binary-mode or unchanged 'inputMessage'.
- */
@SuppressWarnings("unchecked")
- public static Message> toBinary(Message> inputMessage, MessageConverter messageConverter) {
+ protected static Message> toCannonical(Message> inputMessage, MessageConverter messageConverter) {
- Map headers = inputMessage.getHeaders();
- CloudEventAttributes attributes = new CloudEventAttributes(headers);
+ Field headersField = ReflectionUtils.findField(MessageHeaders.class, "headers");
+ headersField.setAccessible(true);
+ Map headers = (Map) ReflectionUtils.getField(headersField, inputMessage.getHeaders());
+ canonicalizeHeaders(headers);
+ String inputContentType = (String) inputMessage.getHeaders().get(DATACONTENTTYPE);
// first check the obvious and see if content-type is `cloudevents`
- if (!attributes.isValidCloudEvent() && headers.containsKey(MessageHeaders.CONTENT_TYPE)) {
- MimeType contentType = resolveContentType(inputMessage.getHeaders());
- if (contentType != null && contentType.getType().equals(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getType())
- && contentType.getSubtype().startsWith(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getSubtype())) {
+ if (!isBinary(inputMessage) && headers.containsKey(MessageHeaders.CONTENT_TYPE)) {
+ MimeType contentType = contentTypeResolver.resolve(inputMessage.getHeaders());
+ if (contentType.getType().equals(APPLICATION_CLOUDEVENTS.getType()) && contentType
+ .getSubtype().startsWith(APPLICATION_CLOUDEVENTS.getSubtype())) {
- String dataContentType = StringUtils.hasText(attributes.getDataContentType())
- ? attributes.getDataContentType()
+ String dataContentType = StringUtils.hasText(inputContentType) ? inputContentType
: MimeTypeUtils.APPLICATION_JSON_VALUE;
String suffix = contentType.getSubtypeSuffix();
- Assert.hasText(suffix, "Content-type 'suffix' can not be determined from " + contentType);
MimeType cloudEventDeserializationContentType = MimeTypeUtils
.parseMimeType(contentType.getType() + "/" + suffix);
Message> cloudEventMessage = MessageBuilder.fromMessage(inputMessage)
.setHeader(MessageHeaders.CONTENT_TYPE, cloudEventDeserializationContentType)
- .setHeader(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE, dataContentType)
- .build();
- Map structuredCloudEvent = (Map) messageConverter.fromMessage(cloudEventMessage, Map.class);
- Message> binaryCeMessage = buildCeMessageFromStructured(structuredCloudEvent, inputMessage.getHeaders());
+ .setHeader(DATACONTENTTYPE, dataContentType).build();
+ Map structuredCloudEvent = (Map) messageConverter
+ .fromMessage(cloudEventMessage, Map.class);
+
+ canonicalizeHeaders(structuredCloudEvent);
+ Message> binaryCeMessage = buildBinaryMessageFromStructuredMap(structuredCloudEvent,
+ inputMessage.getHeaders());
+
return binaryCeMessage;
}
}
- else if (StringUtils.hasText(attributes.getDataContentType())) {
- return MessageBuilder.fromMessage(inputMessage)
- .setHeader(MessageHeaders.CONTENT_TYPE, attributes.getDataContentType())
- .build();
+ else if (StringUtils.hasText(inputContentType)) { // this needs thinking since . .
+ return MessageBuilder.fromMessage(inputMessage).setHeader(MessageHeaders.CONTENT_TYPE, inputContentType)
+ .build();
}
return inputMessage;
}
- private static MimeType resolveContentType(MessageHeaders headers) {
- try {
- return contentTypeResolver.resolve(headers);
- }
- catch (Exception e) {
- // ignore
- }
- return null;
- }
/**
- * Will attempt to determine based on the headers the origin of Message (e.g., HTTP, Kafka etc)
- * and based on this designate prefix to be used for Cloud Events attributes (i.e., `ce-` or `ce_` etc).
+ * Determines attribute prefix based on the presence of certain well defined headers.
*
- * @param messageHeaders instance of {@link MessageHeaders}
- * @return prefix to be used for Cloud Events attributes
+ * TODO work in progress as it needs to be refined
+ *
+ * @param messageHeaders map of message headers
+ * @return prefix (e.g., 'ce_' or 'ce-' etc.)
*/
- public static String determinePrefixToUse(MessageHeaders messageHeaders) {
+ protected static String determinePrefixToUse(Map messageHeaders) {
Set keys = messageHeaders.keySet();
if (keys.contains("user-agent")) {
- return CloudEventMessageUtils.HTTP_ATTR_PREFIX;
- }
- else if (keys.contains("amqp")) {
- return CloudEventMessageUtils.AMQP_ATTR_PREFIX;
+ return HTTP_ATTR_PREFIX;
}
else {
- return CloudEventMessageUtils.DEFAULT_ATTR_PREFIX; // default which also suits Kafka 'ce_'
+ for (String key : messageHeaders.keySet()) {
+ if (key.startsWith("kafka_")) {
+ return DEFAULT_ATTR_PREFIX;
+ }
+ else if (key.startsWith("amqp_")) {
+ return AMQP_ATTR_PREFIX;
+ }
+ else if (key.startsWith(DEFAULT_ATTR_PREFIX)) {
+ return DEFAULT_ATTR_PREFIX;
+ }
+ else if (key.startsWith(HTTP_ATTR_PREFIX)) {
+ return HTTP_ATTR_PREFIX;
+ }
+ else if (key.startsWith(AMQP_ATTR_PREFIX)) {
+ return AMQP_ATTR_PREFIX;
+ }
+ }
}
+
+ return "";
}
/**
- * Typically called by Consumer.
-
+ * Will check for the existence of required attributes. Assumes attributes (headers)
+ * are in canonical form.
+ * @param message input {@link Message}
+ * @return true if this Message represents Cloud Event in binary-mode
*/
- public static CloudEventAttributes generateAttributes(Message> message, CloudEventAttributesProvider provider) {
- CloudEventAttributes attributes = generateDefaultAttributeValues(new CloudEventAttributes(message.getHeaders()),
- message.getPayload().getClass().getName().getClass().getName(), message.getPayload().getClass().getName().getClass().getName());
- provider.generateDefaultCloudEventHeaders(attributes);
- return attributes;
+ protected static boolean isBinary(Message> message) {
+ return message.getHeaders().containsKey(SPECVERSION)
+ && message.getHeaders().containsKey(TYPE)
+ && message.getHeaders().containsKey(SOURCE);
}
- public static CloudEventAttributes generateAttributes(Message> inputMessage, String typeName, String sourceName) {
- CloudEventAttributes attributes = new CloudEventAttributes(inputMessage.getHeaders(), CloudEventMessageUtils.determinePrefixToUse(inputMessage.getHeaders()));
- return generateDefaultAttributeValues(attributes, sourceName, typeName);
+ /**
+ * Will canonicalize Cloud Event attributes (headers) by removing well known prefixes.
+ * So, for example 'ce_source' will become 'source'.
+ * @param headers message headers
+ */
+ private static void canonicalizeHeaders(Map headers) {
+ String[] keys = headers.keySet().toArray(new String[] {});
+ for (String key : keys) {
+ if (key.startsWith(HTTP_ATTR_PREFIX)) {
+ Object value = headers.remove(key);
+ key = key.substring(HTTP_ATTR_PREFIX.length());
+ headers.put(key, value);
+ }
+ else if (key.startsWith(DEFAULT_ATTR_PREFIX)) {
+ Object value = headers.remove(key);
+ key = key.substring(DEFAULT_ATTR_PREFIX.length());
+ headers.put(key, value);
+ }
+ else if (key.startsWith(AMQP_ATTR_PREFIX)) {
+ Object value = headers.remove(key);
+ key = key.substring(AMQP_ATTR_PREFIX.length());
+ headers.put(key, value);
+ }
+ }
}
- private static Message> buildCeMessageFromStructured(Map structuredCloudEvent, MessageHeaders originalHeaders) {
- String prefixToUse = determinePrefixToUse(originalHeaders);
- Object data = null;
- if (structuredCloudEvent.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA)) {
- data = structuredCloudEvent.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA);
- structuredCloudEvent.remove(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA);
+ private static Message> buildBinaryMessageFromStructuredMap(Map structuredCloudEvent,
+ MessageHeaders originalHeaders) {
+ Object payload = structuredCloudEvent.remove(DATA);
+ if (payload == null) {
+ payload = Collections.emptyMap();
}
- else if (structuredCloudEvent.containsKey(CloudEventMessageUtils.CANONICAL_DATA)) {
- data = structuredCloudEvent.get(CloudEventMessageUtils.CANONICAL_DATA);
- structuredCloudEvent.remove(CloudEventMessageUtils.CANONICAL_DATA);
+
+ CloudEventMessageBuilder> messageBuilder = CloudEventMessageBuilder
+ .withData(payload)
+ .copyHeaders(structuredCloudEvent);
+
+ for (String key : originalHeaders.keySet()) {
+ if (!MessageHeaders.ID.equals(key)) {
+ messageBuilder.setHeader(key, originalHeaders.get(key));
+ }
}
- else if (structuredCloudEvent.containsKey(CloudEventMessageUtils.DATA)) {
- data = structuredCloudEvent.get(CloudEventMessageUtils.DATA);
- structuredCloudEvent.remove(CloudEventMessageUtils.DATA);
- }
- Assert.notNull(data, "'data' must not be null");
- MessageBuilder> builder = MessageBuilder.withPayload(data);
- CloudEventAttributes attributes = new CloudEventAttributes(structuredCloudEvent);
- builder.setHeader(prefixToUse + CloudEventMessageUtils.ID, attributes.getId());
- builder.setHeader(prefixToUse + CloudEventMessageUtils.SOURCE, attributes.getSource());
- builder.setHeader(prefixToUse + CloudEventMessageUtils.TYPE, attributes.getType());
- builder.setHeader(prefixToUse + CloudEventMessageUtils.SPECVERSION, attributes.getSpecversion());
- builder.copyHeaders(originalHeaders);
- return builder.build();
+
+ return messageBuilder.build();
}
- private static CloudEventAttributes generateDefaultAttributeValues(CloudEventAttributes attributes, String source, String type) {
- if (attributes.isValidCloudEvent()) {
- return attributes
- .setSpecversion("1.0")
- .setId(UUID.randomUUID().toString())
- .setType(type)
- .setSource(source);
+ private static URI safeGetURI(Map map, String key) {
+ Object uri = map.get(key);
+ if (uri != null && uri instanceof String) {
+ uri = URI.create((String) uri);
}
- return attributes;
+ return (URI) uri;
}
}
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionExtensionConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionExtensionConfiguration.java
new file mode 100644
index 000000000..f595694ad
--- /dev/null
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionExtensionConfiguration.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.cloudevent;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.lang.Nullable;
+
+/**
+ *
+ * @author Oleg Zhurakousky
+ * @since 3.1
+ */
+@Configuration(proxyBeanMethods = false)
+class CloudEventsFunctionExtensionConfiguration {
+
+ @Bean
+ @ConditionalOnMissingClass("io.cloudevents.CloudEvent")
+ public CloudEventsFunctionInvocationHelper nativeFunctionInvocationHelper(@Nullable CloudEventHeaderEnricher cloudEventHeadersProvider) {
+ return new CloudEventsFunctionInvocationHelper(cloudEventHeadersProvider);
+ }
+
+ @Bean
+ @ConditionalOnClass(name = "io.cloudevents.CloudEvent")
+ public CloudEventsFunctionInvocationHelper sdkFunctionInvocationHelper() {
+ // TODO you may need SDKs header provider
+ return null;
+ }
+}
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java
new file mode 100644
index 000000000..c45c010bf
--- /dev/null
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.cloudevent;
+
+import java.net.URI;
+import java.util.UUID;
+
+import org.springframework.beans.BeansException;
+import org.springframework.cloud.function.core.FunctionInvocationHelper;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.core.env.ConfigurableEnvironment;
+import org.springframework.lang.Nullable;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.util.StringUtils;
+
+/**
+ *
+ * @author Oleg Zhurakousky
+ * @since 2.0
+ *
+ */
+class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper>, ApplicationContextAware {
+
+ private ConfigurableApplicationContext applicationContext;
+
+ private final CloudEventHeaderEnricher cloudEventAttributesProvider;
+
+ CloudEventsFunctionInvocationHelper(@Nullable CloudEventHeaderEnricher cloudEventHeadersProvider) {
+ this.cloudEventAttributesProvider = cloudEventHeadersProvider;
+ }
+
+ @Override
+ public boolean isRetainOuputAsMessage(Message> message) {
+ if (message.getHeaders().containsKey("message-type") && message.getHeaders().get("message-type").equals("cloudevent")) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Message> preProcessInput(Message> input, Object inputConverter) {
+ return CloudEventMessageUtils.toCannonical(input, (MessageConverter) inputConverter);
+ }
+
+ @Override
+ public Message> postProcessResult(Message> input, Object result) {
+ Message> resultMessage = null;
+ if (CloudEventMessageUtils.isBinary(input)) {
+ CloudEventMessageBuilder> messageBuilder = CloudEventMessageBuilder
+ .withData(result)
+ .setId(UUID.randomUUID().toString())
+ .setSource(URI.create("http://spring.io/" + getApplicationName()))
+ .setType(result.getClass().getName());
+
+ if (this.cloudEventAttributesProvider != null) {
+ messageBuilder = this.cloudEventAttributesProvider.enrich(messageBuilder);
+ }
+
+ String prefix = this.determineOutputPrefix(input);
+ resultMessage = messageBuilder.build(prefix);
+ }
+ else {
+ resultMessage = MessageBuilder.withPayload(result).build();
+ }
+ return resultMessage;
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = (ConfigurableApplicationContext) applicationContext;
+ }
+
+ private String determineOutputPrefix(Message> input) {
+ //TODO rework to actually figure out where output goes instead of relying on input
+ return CloudEventMessageUtils.determinePrefixToUse(input.getHeaders());
+ }
+
+ private String getApplicationName() {
+ ConfigurableEnvironment environment = this.applicationContext.getEnvironment();
+ String name = environment.getProperty("spring.application.name");
+ return (StringUtils.hasText(name) ? name : "application-" + this.applicationContext.getId());
+ }
+}
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java
index 60261c1a1..100294f83 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java
@@ -20,7 +20,6 @@ import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Set;
-import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -32,26 +31,22 @@ import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
-import org.springframework.cloud.function.cloudevent.CloudEventAttributes;
-import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider;
-import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
+import org.springframework.cloud.function.core.FunctionInvocationHelper;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.convert.ConversionService;
-import org.springframework.core.env.ConfigurableEnvironment;
+import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.CompositeMessageConverter;
-import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StringUtils;
/**
- * Implementation of {@link FunctionRegistry} capable of discovering functioins in
- * {@link BeanFactory}.
+ * Implementation of {@link FunctionRegistry} capable of discovering functioins in {@link BeanFactory}.
*
* @author Oleg Zhurakousky
*/
@@ -59,19 +54,14 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
private GenericApplicationContext applicationContext;
- private CloudEventAttributesProvider cloudEventAtttributesProvider;
-
- public BeanFactoryAwareFunctionRegistry(ConversionService conversionService,
- CompositeMessageConverter messageConverter, JsonMapper jsonMapper) {
- super(conversionService, messageConverter, jsonMapper);
+ public BeanFactoryAwareFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter,
+ JsonMapper jsonMapper, @Nullable FunctionInvocationHelper> functionInvocationHelper) {
+ super(conversionService, messageConverter, jsonMapper, functionInvocationHelper);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (GenericApplicationContext) applicationContext;
- if (applicationContext.getBeanNamesForType(CloudEventAttributesProvider.class).length > 0) {
- this.cloudEventAtttributesProvider = applicationContext.getBean(CloudEventAttributesProvider.class);
- }
}
/*
@@ -80,9 +70,10 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
*/
@Override
public int size() {
- return this.applicationContext.getBeanNamesForType(Supplier.class).length
- + this.applicationContext.getBeanNamesForType(Function.class).length
- + this.applicationContext.getBeanNamesForType(Consumer.class).length + super.size();
+ return this.applicationContext.getBeanNamesForType(Supplier.class).length +
+ this.applicationContext.getBeanNamesForType(Function.class).length +
+ this.applicationContext.getBeanNamesForType(Consumer.class).length +
+ super.size();
}
/*
@@ -92,9 +83,12 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
public Set getNames(Class> type) {
Set registeredNames = super.getNames(type);
if (type == null) {
- registeredNames.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Function.class)));
- registeredNames.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Supplier.class)));
- registeredNames.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Consumer.class)));
+ registeredNames
+ .addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Function.class)));
+ registeredNames
+ .addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Supplier.class)));
+ registeredNames
+ .addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Consumer.class)));
}
else {
registeredNames.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(type)));
@@ -105,8 +99,9 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public T lookup(Class> type, String functionDefinition, String... expectedOutputMimeTypes) {
- functionDefinition = StringUtils.hasText(functionDefinition) ? functionDefinition
- : this.applicationContext.getEnvironment().getProperty(FunctionProperties.FUNCTION_DEFINITION, "");
+ functionDefinition = StringUtils.hasText(functionDefinition)
+ ? functionDefinition
+ : this.applicationContext.getEnvironment().getProperty(FunctionProperties.FUNCTION_DEFINITION, "");
functionDefinition = this.normalizeFunctionDefinition(functionDefinition);
if (!StringUtils.hasText(functionDefinition)) {
@@ -117,8 +112,7 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
if (function == null) {
Set functionRegistratioinNames = super.getNames(null);
- String[] functionNames = StringUtils
- .delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), "|");
+ String[] functionNames = StringUtils.delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), "|");
for (String functionName : functionNames) {
if (functionRegistratioinNames.contains(functionName)) {
logger.info("Skipping function '" + functionName + "' since it is already present");
@@ -132,31 +126,26 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
functionRegistration = (FunctionRegistration) functionCandidate;
}
else if (this.isFunctionPojo(functionCandidate, functionName)) {
- Method functionalMethod = FunctionTypeUtils
- .discoverFunctionalMethod(functionCandidate.getClass());
+ Method functionalMethod = FunctionTypeUtils.discoverFunctionalMethod(functionCandidate.getClass());
functionCandidate = this.proxyTarget(functionCandidate, functionalMethod);
functionType = FunctionTypeUtils.fromFunctionMethod(functionalMethod);
}
else if (this.isSpecialFunctionRegistration(functionNames, functionName)) {
- functionRegistration = this.applicationContext.getBean(
- functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX,
- FunctionRegistration.class);
+ functionRegistration = this.applicationContext
+ .getBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX, FunctionRegistration.class);
}
else {
- functionType = FunctionTypeUtils.discoverFunctionType(functionCandidate, functionName,
- this.applicationContext);
+ functionType = FunctionTypeUtils.discoverFunctionType(functionCandidate, functionName, this.applicationContext);
}
if (functionRegistration == null) {
- functionRegistration = new FunctionRegistration(functionCandidate, functionName)
- .type(functionType);
+ functionRegistration = new FunctionRegistration(functionCandidate, functionName).type(functionType);
}
this.register(functionRegistration);
}
else {
if (logger.isDebugEnabled()) {
- logger.debug("Function '" + functionName
- + "' is not available in FunctionCatalog or BeanFactory");
+ logger.debug("Function '" + functionName + "' is not available in FunctionCatalog or BeanFactory");
}
}
}
@@ -164,36 +153,9 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
function = super.doLookup(type, functionDefinition, expectedOutputMimeTypes);
}
- if (function != null) {
- BiFunction, Object, Message>> invocationResultHeaderEnricher = new BiFunction, Object, Message>>() {
- @Override
- public Message> apply(Message> inputMessage, Object invocationResult) {
- // TODO: Factor it out! Cloud Events specific code
- CloudEventAttributes generatedCeHeaders = CloudEventMessageUtils.generateAttributes(inputMessage,
- invocationResult.getClass().getName(), getApplicationName());
- CloudEventAttributes attributes = new CloudEventAttributes(generatedCeHeaders,
- CloudEventMessageUtils.determinePrefixToUse(inputMessage.getHeaders()));
- if (cloudEventAtttributesProvider != null) {
- cloudEventAtttributesProvider.generateDefaultCloudEventHeaders(attributes);
- }
- Message message = MessageBuilder.withPayload(invocationResult).copyHeaders(attributes).build();
-
- return message;
- }
- };
- function.setOutputMessageHeaderEnricher(invocationResultHeaderEnricher);
- }
-
return (T) function;
}
- private String getApplicationName() {
- ConfigurableEnvironment environment = this.applicationContext.getEnvironment();
- String name = environment.getProperty("spring.application.name");
- return "http://spring.io/"
- + (StringUtils.hasText(name) ? name : "application-" + this.applicationContext.getId());
- }
-
private Object discoverFunctionInBeanFactory(String functionName) {
Object functionCandidate = null;
if (this.applicationContext.containsBean(functionName)) {
@@ -201,8 +163,7 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
}
else {
try {
- functionCandidate = BeanFactoryAnnotationUtils
- .qualifiedBeanOfType(this.applicationContext.getBeanFactory(), Object.class, functionName);
+ functionCandidate = BeanFactoryAnnotationUtils.qualifiedBeanOfType(this.applicationContext.getBeanFactory(), Object.class, functionName);
}
catch (Exception e) {
// ignore since there is no safe isAvailable-kind of method
@@ -217,19 +178,21 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
}
private boolean isFunctionPojo(Object functionCandidate, String functionName) {
- return !functionCandidate.getClass().isSynthetic() && !(functionCandidate instanceof Supplier)
- && !(functionCandidate instanceof Function) && !(functionCandidate instanceof Consumer)
- && !this.applicationContext.containsBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX);
+ return !functionCandidate.getClass().isSynthetic()
+ && !(functionCandidate instanceof Supplier)
+ && !(functionCandidate instanceof Function)
+ && !(functionCandidate instanceof Consumer)
+ && !this.applicationContext.containsBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX);
}
/**
- * At the moment 'special function registration' simply implies that a bean under the
- * provided functionName may have already been wrapped and registered as
- * FunuctionRegistration with BeanFactory under the name of the function suffixed with
- * {@link FunctionRegistration#REGISTRATION_NAME_SUFFIX} (e.g.,
- * 'myKotlinFunction_registration').
- *
+ * At the moment 'special function registration' simply implies that a bean under the provided functionName
+ * may have already been wrapped and registered as FunuctionRegistration with BeanFactory under the name of
+ * the function suffixed with {@link FunctionRegistration#REGISTRATION_NAME_SUFFIX}
+ * (e.g., 'myKotlinFunction_registration').
+ *
* At the moment only Kotlin module does this
+ *
* @param functionCandidate candidate for FunctionInvocationWrapper instance
* @param functionName the name of the function
* @return true if this function candidate qualifies
@@ -250,5 +213,4 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
});
return pf.getProxy();
}
-
}
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java
index fc2b56d8b..b801e6a8d 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java
@@ -31,7 +31,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
-import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -46,13 +45,12 @@ import reactor.util.function.Tuples;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cloud.function.cloudevent.CloudEventAttributes;
-import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.config.RoutingFunction;
+import org.springframework.cloud.function.core.FunctionInvocationHelper;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.core.ResolvableType;
import org.springframework.core.convert.ConversionService;
@@ -98,10 +96,13 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
private final JsonMapper jsonMapper;
+ private final FunctionInvocationHelper> functionInvocationHelper;
+
@Autowired(required = false)
private FunctionAroundWrapper functionAroundWrapper;
- public SimpleFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter, JsonMapper jsonMapper) {
+ public SimpleFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter, JsonMapper jsonMapper,
+ @Nullable FunctionInvocationHelper> functionInvocationHelper) {
Assert.notNull(messageConverter, "'messageConverter' must not be null");
Assert.notNull(jsonMapper, "'jsonMapper' must not be null");
this.conversionService = conversionService;
@@ -109,6 +110,11 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
this.messageConverter = messageConverter;
this.headersField = ReflectionUtils.findField(MessageHeaders.class, "headers");
this.headersField.setAccessible(true);
+ this.functionInvocationHelper = functionInvocationHelper;
+ }
+
+ public SimpleFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter, JsonMapper jsonMapper) {
+ this(conversionService, messageConverter, jsonMapper, null);
}
@SuppressWarnings("unchecked")
@@ -322,11 +328,11 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
*/
private Function enhancer;
- private BiFunction, Object, Message>> outputMessageHeaderEnricher;
-
- void setOutputMessageHeaderEnricher(BiFunction, Object, Message>> outputMessageHeaderEnricher) {
- this.outputMessageHeaderEnricher = outputMessageHeaderEnricher;
- }
+// private BiFunction, Object, Message>> outputMessageHeaderEnricher;
+//
+// void setOutputMessageHeaderEnricher(BiFunction, Object, Message>> outputMessageHeaderEnricher) {
+// this.outputMessageHeaderEnricher = outputMessageHeaderEnricher;
+// }
FunctionInvocationWrapper(FunctionInvocationWrapper function) {
this.target = function.target;
@@ -623,8 +629,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
this.sanitizeHeaders(((Message) input).getHeaders()).forEach((k, v) -> headersMap.putIfAbsent(k, v));
}
else {
- if (this.outputMessageHeaderEnricher != null) {
- result = this.outputMessageHeaderEnricher.apply((Message>) input, result);
+ if (functionInvocationHelper != null) {
+ result = functionInvocationHelper.postProcessResult((Message>) input, result);
}
else {
result = MessageBuilder.withPayload(result).copyHeaders(this.sanitizeHeaders(((Message) input).getHeaders())).build();
@@ -823,7 +829,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
return null;
}
- input = CloudEventMessageUtils.toBinary((Message>) input, messageConverter);
+ if (functionInvocationHelper != null) {
+ input = functionInvocationHelper.preProcessInput((Message>) input, messageConverter);
+ }
convertedInput = this.convertInputMessageIfNecessary((Message) input, type);
if (convertedInput == null) { // give ConversionService a chance
@@ -910,7 +918,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
* case that requires it since it may contain forwarding url
*/
private boolean containsRetainMessageSignalInHeaders(Message message) {
- if (new CloudEventAttributes(message.getHeaders()).isValidCloudEvent()) {
+ if (functionInvocationHelper != null && functionInvocationHelper.isRetainOuputAsMessage(message)) {
return true;
}
else {
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java
index f03158e50..161d6cae3 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java
@@ -34,6 +34,7 @@ import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry;
+import org.springframework.cloud.function.core.FunctionInvocationHelper;
import org.springframework.cloud.function.json.GsonMapper;
import org.springframework.cloud.function.json.JacksonMapper;
import org.springframework.cloud.function.json.JsonMapper;
@@ -47,6 +48,8 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.FilterType;
import org.springframework.core.convert.converter.GenericConverter;
import org.springframework.core.convert.support.ConfigurableConversionService;
+import org.springframework.lang.Nullable;
+import org.springframework.messaging.Message;
import org.springframework.messaging.converter.ByteArrayMessageConverter;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
@@ -71,7 +74,8 @@ public class ContextFunctionCatalogAutoConfiguration {
static final String PREFERRED_MAPPER_PROPERTY = "spring.http.converters.preferred-json-mapper";
@Bean
- public FunctionRegistry functionCatalog(List messageConverters, JsonMapper jsonMapper, ConfigurableApplicationContext context) {
+ public FunctionRegistry functionCatalog(List messageConverters, JsonMapper jsonMapper,
+ ConfigurableApplicationContext context, @Nullable FunctionInvocationHelper> functionInvocationHelper) {
ConfigurableConversionService conversionService = (ConfigurableConversionService) context.getBeanFactory().getConversionService();
Map converters = context.getBeansOfType(GenericConverter.class);
for (GenericConverter converter : converters.values()) {
@@ -105,7 +109,7 @@ public class ContextFunctionCatalogAutoConfiguration {
messageConverter = new SmartCompositeMessageConverter(mcList);
}
- return new BeanFactoryAwareFunctionRegistry(conversionService, messageConverter, jsonMapper);
+ return new BeanFactoryAwareFunctionRegistry(conversionService, messageConverter, jsonMapper, functionInvocationHelper);
}
@Bean(RoutingFunction.FUNCTION_NAME)
diff --git a/spring-cloud-function-context/src/main/resources/META-INF/spring.factories b/spring-cloud-function-context/src/main/resources/META-INF/spring.factories
index 1dbbeee8d..fa3023fb4 100644
--- a/spring-cloud-function-context/src/main/resources/META-INF/spring.factories
+++ b/spring-cloud-function-context/src/main/resources/META-INF/spring.factories
@@ -1,6 +1,7 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
-org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration
+org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration,\
+org.springframework.cloud.function.cloudevent.CloudEventsFunctionExtensionConfiguration
org.springframework.cloud.function.context.WrapperDetector=\
org.springframework.cloud.function.context.config.FluxWrapperDetector
org.springframework.context.ApplicationContextInitializer=\
-org.springframework.cloud.function.context.config.ContextFunctionCatalogInitializer
+org.springframework.cloud.function.context.config.ContextFunctionCatalogInitializer
\ No newline at end of file
diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java
index 96a5661bf..ff9bf4b0b 100644
--- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java
+++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java
@@ -16,7 +16,9 @@
package org.springframework.cloud.function.cloudevent;
+import java.net.URI;
import java.text.SimpleDateFormat;
+import java.util.UUID;
import java.util.function.Function;
import org.junit.jupiter.api.Test;
@@ -29,7 +31,6 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
-import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
@@ -42,12 +43,50 @@ public class CloudEventFunctionTests {
@SuppressWarnings("unchecked")
@Test
- public void testBinaryPojoToPojoDefaultOutputAttributeProvider() {
+ public void testBinaryPojoToPojoDefaultOutputHeaderProvider() {
Function function = this.lookup("echo", TestConfiguration.class);
- Message inputMessage = MessageBuilder.withPayload("{\"name\":\"Ricky\"}")
- .copyHeaders(CloudEventMessageUtils.get("https://spring.io/", "org.springframework")).build();
- assertThat(CloudEventMessageUtils.isBinary(inputMessage.getHeaders())).isTrue();
+ String id = UUID.randomUUID().toString();
+
+ Message inputMessage = CloudEventMessageBuilder
+ .withData("{\"name\":\"Ricky\"}")
+ .setId(id)
+ .setSource("https://spring.io/")
+ .setType("org.springframework")
+ .build();
+
+ assertThat(inputMessage.getHeaders().getId()).isEqualTo(UUID.fromString(id));
+ assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isTrue();
+
+ Message resultMessage = (Message) function.apply(inputMessage);
+
+
+ /*
+ * Validates that although user only deals with POJO, the framework recognizes
+ * both on input and output that it is dealing with Cloud Event and generates
+ * appropriate headers/attributes
+ */
+ assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue();
+ assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(Person.class.getName());
+ assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
+ }
+
+ // this kind of emulates that message came from Kafka
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testBinaryPojoToPojoDefaultOutputHeaderProviderWithPrefix() {
+ Function function = this.lookup("echo", TestConfiguration.class);
+
+ String id = UUID.randomUUID().toString();
+
+ Message inputMessage = CloudEventMessageBuilder
+ .withData("{\"name\":\"Ricky\"}")
+ .setHeader("ce_id", id)
+ .setHeader("ce_source", "https://spring.io/")
+ .setHeader("ce_type", "org.springframework")
+ .build();
+
+// assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isTrue();
Message resultMessage = (Message) function.apply(inputMessage);
@@ -56,10 +95,9 @@ public class CloudEventFunctionTests {
* both on input and output that it is dealing with Cloud Event and generates
* appropriate headers/attributes
*/
- CloudEventAttributes attributes = new CloudEventAttributes(resultMessage.getHeaders());
- assertThat(attributes.isValidCloudEvent()).isTrue();
- assertThat((String) attributes.getType()).isEqualTo(Person.class.getName());
- assertThat((String) attributes.getSource()).isEqualTo("http://spring.io/application-application");
+ assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue();
+ assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(Person.class.getName());
+ assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
}
@SuppressWarnings("unchecked")
@@ -79,24 +117,25 @@ public class CloudEventFunctionTests {
"}";
Function function = this.lookup("springRelease", TestConfiguration.class);
- Message inputMessage = MessageBuilder.withPayload(payload)
+ Message inputMessage = CloudEventMessageBuilder
+ .withData(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, CloudEventMessageUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json")
.build();
- assertThat(CloudEventMessageUtils.isBinary(inputMessage.getHeaders())).isFalse();
+
+ assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isFalse();
Message resultMessage = (Message) function.apply(inputMessage);
assertThat(resultMessage.getPayload().getReleaseDate())
.isEqualTo(new SimpleDateFormat("dd-MM-yyyy").parse("01-10-2006"));
assertThat(resultMessage.getPayload().getVersion()).isEqualTo("2.0");
- /*
- * Validates that although user only deals with POJO, the framework recognizes
- * both on input and output that it is dealing with Cloud Event and generates
- * appropriate headers/attributes
- */
- CloudEventAttributes attributes = new CloudEventAttributes(resultMessage.getHeaders());
- assertThat(attributes.isValidCloudEvent()).isTrue();
- assertThat((String) attributes.getType()).isEqualTo(SpringReleaseEvent.class.getName());
- assertThat((String) attributes.getSource()).isEqualTo("http://spring.io/application-application");
+// /*
+// * Validates that although user only deals with POJO, the framework recognizes
+// * both on input and output that it is dealing with Cloud Event and generates
+// * appropriate headers/attributes
+// */
+ assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue();
+ assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(SpringReleaseEvent.class.getName());
+ assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
}
@SuppressWarnings("unchecked")
@@ -115,10 +154,11 @@ public class CloudEventFunctionTests {
"}";
Function function = this.lookup("springRelease", TestConfiguration.class);
- Message inputMessage = MessageBuilder.withPayload(payload)
+ Message inputMessage = CloudEventMessageBuilder
+ .withData(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, CloudEventMessageUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json")
.build();
- assertThat(CloudEventMessageUtils.isBinary(inputMessage.getHeaders())).isFalse();
+ assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isFalse();
Message resultMessage = (Message) function.apply(inputMessage);
assertThat(resultMessage.getPayload().getReleaseDate())
@@ -129,10 +169,9 @@ public class CloudEventFunctionTests {
* both on input and output that it is dealing with Cloud Event and generates
* appropriate headers/attributes
*/
- CloudEventAttributes attributes = new CloudEventAttributes(resultMessage.getHeaders());
- assertThat(attributes.isValidCloudEvent()).isTrue();
- assertThat((String) attributes.getType()).isEqualTo(SpringReleaseEvent.class.getName());
- assertThat((String) attributes.getSource()).isEqualTo("http://spring.io/application-application");
+ assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue();
+ assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(SpringReleaseEvent.class.getName());
+ assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
}
private Function lookup(String functionDefinition, Class>... configClass) {
diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java
deleted file mode 100644
index 67e2b5cf8..000000000
--- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright 2019-2019 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.cloud.function.cloudevent;
-
-import java.lang.reflect.Field;
-import java.util.UUID;
-
-import org.junit.jupiter.api.Test;
-
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.builder.SpringApplicationBuilder;
-import org.springframework.cloud.function.context.FunctionCatalog;
-import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry;
-import org.springframework.cloud.function.context.config.SmartCompositeMessageConverter;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.MessageHeaders;
-import org.springframework.messaging.support.MessageBuilder;
-import org.springframework.util.MimeTypeUtils;
-import org.springframework.util.ReflectionUtils;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/**
- *
- * @author Oleg Zhurakousky
- *
- */
-public class CloudEventTypeConversionTests {
- @Test
- public void testFromMessageBinaryPayloadMatchesType() {
- SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
- CloudEventAttributes ceAttributes = CloudEventMessageUtils
- .get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework");
- ceAttributes.setDataContentType("text/plain");
- Message message = MessageBuilder.withPayload("Hello Ricky").copyHeaders(ceAttributes).build();
-
- String converted = (String) messageConverter.fromMessage(message, String.class);
- assertThat(converted).isEqualTo("Hello Ricky");
- }
-
- @Test
- public void testFromMessageBinaryPayloadDoesNotMatchType() {
- SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
- CloudEventAttributes ceAttributes = CloudEventMessageUtils
- .get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework");
- Message message = MessageBuilder.withPayload("Hello Ricky".getBytes())
- .copyHeaders(ceAttributes)
- .setHeader(MessageHeaders.CONTENT_TYPE,
- MimeTypeUtils.parseMimeType("application/cloudevents+json;charset=utf-8"))
- .build();
- String converted = (String) messageConverter.fromMessage(message, String.class);
- assertThat(converted).isEqualTo("Hello Ricky");
- }
-
- @Test // JsonMessageConverter does some special things between byte[] and String so this works
- public void testFromMessageBinaryPayloadNoDataContentTypeToString() {
- SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
- CloudEventAttributes ceAttributes = CloudEventMessageUtils
- .get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework");
- Message message = MessageBuilder.withPayload("Hello Ricky".getBytes())
- .copyHeaders(ceAttributes)
- .setHeader(MessageHeaders.CONTENT_TYPE,
- MimeTypeUtils.parseMimeType("application/cloudevents+json;charset=utf-8"))
- .build();
- String converted = (String) messageConverter.fromMessage(message, String.class);
- assertThat(converted).isEqualTo("Hello Ricky");
- }
-
- @Test // Unlike the previous test the type here is POJO so no special treatement
- public void testFromMessageBinaryPayloadNoDataContentTypeToPOJO() {
- SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
- CloudEventAttributes ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework");
- Message message = MessageBuilder.withPayload("Hello Ricky".getBytes())
- .copyHeaders(ceAttributes)
- .setHeader(MessageHeaders.CONTENT_TYPE,
- MimeTypeUtils.parseMimeType("application/cloudevents+json;charset=utf-8"))
- .build();
- String converted = (String) messageConverter.fromMessage(message, Person.class);
- assertThat(converted).isNull();
- }
-
- @Test // will fall on default CT which is json
- public void testFromMessageBinaryPayloadNoDataContentTypeToPOJOThatWorks() {
- SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
- CloudEventAttributes ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework");
- Message message = MessageBuilder.withPayload("{\"name\":\"Ricky\"}".getBytes())
- .copyHeaders(ceAttributes)
- .setHeader(MessageHeaders.CONTENT_TYPE,
- MimeTypeUtils.parseMimeType("application/cloudevents+json;charset=utf-8"))
- .build();
- Person converted = (Person) messageConverter.fromMessage(message, Person.class);
- assertThat(converted.getName()).isEqualTo("Ricky");
- }
-
- private SmartCompositeMessageConverter configure(Class>... configClass) {
- ApplicationContext context = new SpringApplicationBuilder(configClass).run(
- "--logging.level.org.springframework.cloud.function=DEBUG", "--spring.main.lazy-initialization=true");
- FunctionCatalog catalog = context.getBean(FunctionCatalog.class);
- Field f = ReflectionUtils.findField(BeanFactoryAwareFunctionRegistry.class, "messageConverter");
- f.setAccessible(true);
- try {
- SmartCompositeMessageConverter messageConverter = (SmartCompositeMessageConverter) f.get(catalog);
- return messageConverter;
- }
- catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- @EnableAutoConfiguration
- @Configuration
- public static class DummyConfiguration {
- }
-
- public static class Person {
- private String name;
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
- }
-}
diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionInvocationHelper.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionInvocationHelper.java
new file mode 100644
index 000000000..b0649912a
--- /dev/null
+++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionInvocationHelper.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.springframework.cloud.function.core;
+
+
+/**
+ *
+ * @author Oleg Zhurakousky
+ *
+ */
+public interface FunctionInvocationHelper {
+
+ boolean isRetainOuputAsMessage(I input);
+
+ I preProcessInput(I input, Object inputConverter);
+
+ I postProcessResult(I input, Object result);
+}
diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java
index 2ac2565ca..ef44106a1 100644
--- a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java
+++ b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java
@@ -25,13 +25,13 @@ import java.util.function.Function;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.client.RestTemplateBuilder;
-import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider;
+import org.springframework.cloud.function.cloudevent.CloudEventHeaderEnricher;
+import org.springframework.cloud.function.cloudevent.CloudEventMessageBuilder;
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.cloud.function.web.util.HeaderUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.http.RequestEntity;
import org.springframework.messaging.Message;
-import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
@@ -104,9 +104,9 @@ public class CloudeventDemoApplication {
}
@Bean
- public CloudEventAttributesProvider cloudEventAttributesProvider() {
- return attributes -> {
- attributes.setSource("https://interface21.com/").setType("com.interface21");
+ public CloudEventHeaderEnricher cloudEventHeaderEnricher() {
+ return headers -> {
+ return headers.setSource("https://interface21.com/").setType("com.interface21");
};
}
@@ -129,11 +129,11 @@ public class CloudeventDemoApplication {
}
@Bean
- public Consumer> pojoConsumer(CloudEventAttributesProvider provider, RestTemplateBuilder builder) {
+ public Consumer> pojoConsumer(CloudEventHeaderEnricher enricher, RestTemplateBuilder builder) {
return eventMessage -> {
+ Message> newMessage = enricher.enrich(CloudEventMessageBuilder.fromMessage(eventMessage)).build(CloudEventMessageUtils.HTTP_ATTR_PREFIX);
RequestEntity entity = RequestEntity.post(URI.create("http://foo.com"))
- .headers(HeaderUtils.fromMessage(
- new MessageHeaders(CloudEventMessageUtils.generateAttributes(eventMessage, provider))))
+ .headers(HeaderUtils.fromMessage(newMessage.getHeaders()))
.body(eventMessage.getPayload());
List sourceHeader = entity.getHeaders().get("ce-source");
Assert.isTrue(sourceHeader.get(0).equals("https://interface21.com/"), "'source' must be https://interface21.com/");
diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationFunctionTests.java b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationFunctionTests.java
index acefaf273..e36788d10 100644
--- a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationFunctionTests.java
+++ b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationFunctionTests.java
@@ -21,6 +21,7 @@ import java.util.function.Function;
import org.junit.jupiter.api.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
+import org.springframework.cloud.function.cloudevent.CloudEventMessageBuilder;
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.context.ConfigurableApplicationContext;
@@ -40,9 +41,11 @@ public class CloudeventDemoApplicationFunctionTests {
try(ConfigurableApplicationContext context = new SpringApplicationBuilder(CloudeventDemoApplication.class)
.web(WebApplicationType.NONE).run()) {
FunctionCatalog catalog = context.getBean(FunctionCatalog.class);
- Message binaryCloudEventMessage = MessageBuilder
- .withPayload("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}")
- .copyHeaders(CloudEventMessageUtils.get("spring.io/spring-event", "com.example.springevent"))
+
+ Message inputMessage = CloudEventMessageBuilder
+ .withData("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}")
+ .setSource("https://spring.io/spring-event")
+ .setType("com.example.springevent")
.build();
/*
@@ -51,16 +54,16 @@ public class CloudeventDemoApplicationFunctionTests {
* inside spring-cloud-function.
*/
Function, Message> asPojoMessage = catalog.lookup("asPOJOMessage");
- System.out.println(asPojoMessage.apply(binaryCloudEventMessage));
+ System.out.println(asPojoMessage.apply(inputMessage));
Function, Message> asPojo = catalog.lookup("asPOJO");
- System.out.println(asPojo.apply(binaryCloudEventMessage));
+ System.out.println(asPojo.apply(inputMessage));
Function, Message> asString = catalog.lookup("asString");
- System.out.println(asString.apply(binaryCloudEventMessage));
+ System.out.println(asString.apply(inputMessage));
Function, Message> asStringMessage = catalog.lookup("asStringMessage");
- System.out.println(asStringMessage.apply(binaryCloudEventMessage));
+ System.out.println(asStringMessage.apply(inputMessage));
}
}
@@ -69,9 +72,11 @@ public class CloudeventDemoApplicationFunctionTests {
try(ConfigurableApplicationContext context = new SpringApplicationBuilder(CloudeventDemoApplication.class)
.web(WebApplicationType.NONE).run()) {
FunctionCatalog catalog = context.getBean(FunctionCatalog.class);
- Message binaryCloudEventMessage = MessageBuilder
- .withPayload("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}")
- .copyHeaders(CloudEventMessageUtils.get("spring.io/spring-event", "com.example.springevent"))
+
+ Message inputMessage = CloudEventMessageBuilder
+ .withData("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}")
+ .setSource("https://spring.io/spring-event")
+ .setType("com.example.springevent")
.build();
/*
@@ -80,7 +85,7 @@ public class CloudeventDemoApplicationFunctionTests {
* inside spring-cloud-function.
*/
Function, Message> asPojoMessage = catalog.lookup("consumeAndProduceCloudEvent");
- System.out.println(asPojoMessage.apply(binaryCloudEventMessage));
+ System.out.println(asPojoMessage.apply(inputMessage));
}
}
@@ -89,9 +94,11 @@ public class CloudeventDemoApplicationFunctionTests {
try(ConfigurableApplicationContext context = new SpringApplicationBuilder(CloudeventDemoApplication.class)
.web(WebApplicationType.NONE).run()) {
FunctionCatalog catalog = context.getBean(FunctionCatalog.class);
- Message binaryCloudEventMessage = MessageBuilder
- .withPayload("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}")
- .copyHeaders(CloudEventMessageUtils.get("spring.io/spring-event", "com.example.springevent"))
+
+ Message inputMessage = CloudEventMessageBuilder
+ .withData("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}")
+ .setSource("https://spring.io/spring-event")
+ .setType("com.example.springevent")
.build();
/*
@@ -100,7 +107,7 @@ public class CloudeventDemoApplicationFunctionTests {
* inside spring-cloud-function.
*/
Function, Message> asPojoMessage = catalog.lookup("consumeAndProduceCloudEventAsPojoToPojo");
- System.out.println(asPojoMessage.apply(binaryCloudEventMessage));
+ System.out.println(asPojoMessage.apply(inputMessage));
}
}
}
diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java
index d5165e013..493e08c8e 100644
--- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java
+++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java
@@ -33,7 +33,6 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.ObjectProvider;
-import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.context.message.MessageUtils;
@@ -218,10 +217,10 @@ public class RequestProcessor {
}
private boolean isValidCloudEvent(Set headerKeys) {
- return headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID)
- && headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)
- && headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)
- && headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION);
+ return headerKeys.contains("ce-id")
+ && headerKeys.contains("ce-source")
+ && headerKeys.contains("ce-type")
+ && headerKeys.contains("ce-specversion");
}
// this seem to be very relevant to AWS container tests