From 306da4248a8308e7519fc8a4c4bb6ce206076e94 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 30 Nov 2020 15:54:50 +0100 Subject: [PATCH] Restructure Cloud Events support to optionally support Cloud Events SDK --- .../cloudevent/CloudEventAttributes.java | 187 ---------- .../CloudEventAttributesProvider.java | 48 --- .../cloudevent/CloudEventHeaderEnricher.java | 54 +++ .../cloudevent/CloudEventMessageBuilder.java | 196 ++++++++++ .../cloudevent/CloudEventMessageUtils.java | 334 ++++++++---------- ...dEventsFunctionExtensionConfiguration.java | 45 +++ .../CloudEventsFunctionInvocationHelper.java | 101 ++++++ .../BeanFactoryAwareFunctionRegistry.java | 114 ++---- .../catalog/SimpleFunctionRegistry.java | 34 +- ...ntextFunctionCatalogAutoConfiguration.java | 8 +- .../main/resources/META-INF/spring.factories | 5 +- .../cloudevent/CloudEventFunctionTests.java | 91 +++-- .../CloudEventTypeConversionTests.java | 142 -------- .../core/FunctionInvocationHelper.java | 33 ++ .../cloudevent/CloudeventDemoApplication.java | 16 +- ...loudeventDemoApplicationFunctionTests.java | 37 +- .../cloud/function/web/RequestProcessor.java | 9 +- 17 files changed, 751 insertions(+), 703 deletions(-) delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventHeaderEnricher.java create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionExtensionConfiguration.java create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java delete mode 100644 spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java create mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionInvocationHelper.java diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java deleted file mode 100644 index 533eb2342..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Copyright 2020-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.cloudevent; - -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import org.springframework.messaging.MessageHeaders; -import org.springframework.util.StringUtils; - - -/** - * Utility class to assist with accessing and setting Cloud Events attributes from {@link MessageHeaders}. - *

- * It is effectively a wrapper over {@link MessageHeaders} which is a {@link Map}. - * It also provides best effort to both discover the actual attribute name (regardless of the prefix) - * as well as set appropriate attribute name. - *

- * For example, If there is an attribute `ce-source` or `ce_source` or 'source`, by simply calling getSource() - * we'll discover it and will return its value. - *
- * Similar effort will happen during the setting of the attribute. If you provide {@link #prefixToUse} we will - * use it otherwise we'll attempt to determine based on current execution context which prefix to use. - * - * @author Oleg Zhurakousky - * @author Dave Syer - * - * @since 3.1 - */ -public class CloudEventAttributes extends HashMap { - - /** - * - */ - private static final long serialVersionUID = 5393610770855366497L; - - - private final String prefixToUse; - - public CloudEventAttributes(Map headers, String prefixToUse) { - super(headers); - this.prefixToUse = prefixToUse; - } - - public CloudEventAttributes(Map headers) { - this(headers, null); - } - - public CloudEventAttributes setId(String id) { - this.setAttribute(CloudEventMessageUtils.ID, id); - return this; - } - - public A getId() { - A id = this.getAttribute(CloudEventMessageUtils.ID); - if (id instanceof UUID) { - id = null; - } - return id; - } - - public CloudEventAttributes setSource(String source) { - this.setAttribute(CloudEventMessageUtils.SOURCE, source); - return this; - } - - public A getSource() { - return this.getAttribute(CloudEventMessageUtils.SOURCE); - } - - public CloudEventAttributes setSpecversion(String specversion) { - this.setAttribute(CloudEventMessageUtils.SPECVERSION, specversion); - return this; - } - - public A getSpecversion() { - return this.getAttribute(CloudEventMessageUtils.SPECVERSION); - } - - public CloudEventAttributes setType(String type) { - this.setAttribute(CloudEventMessageUtils.TYPE, type); - return this; - } - - public A getType() { - return this.getAttribute(CloudEventMessageUtils.TYPE); - } - - public CloudEventAttributes setDataContentType(String datacontenttype) { - this.setAttribute(CloudEventMessageUtils.DATACONTENTTYPE, datacontenttype); - return this; - } - - public A getDataContentType() { - return this.getAttribute(CloudEventMessageUtils.DATACONTENTTYPE); - } - - public CloudEventAttributes setDataSchema(String dataschema) { - this.setAttribute(CloudEventMessageUtils.DATASCHEMA, dataschema); - return this; - } - - public A getDataSchema() { - return this.getAttribute(CloudEventMessageUtils.DATASCHEMA); - } - - public CloudEventAttributes setSubject(String subject) { - this.setAttribute(CloudEventMessageUtils.SUBJECT, subject); - return this; - } - - public A getSubect() { - return this.getAttribute(CloudEventMessageUtils.SUBJECT); - } - - public CloudEventAttributes setTime(String time) { - this.setAttribute(CloudEventMessageUtils.TIME, time); - return this; - } - - public A getTime() { - return this.getAttribute(CloudEventMessageUtils.TIME); - } - - /** - * Will delegate to the underlying {@link Map} returning the value for the requested attribute or null. - */ - @SuppressWarnings("unchecked") - public A getAttribute(String attrName) { - if (this.containsKey(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attrName)) { - return (A) this.get(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attrName); - } - else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attrName)) { - return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attrName); - } - return (A) this.get(attrName); - } - - /** - * Determines if this instance of {@link CloudEventAttributes} represents valid Cloud Event. - * This implies that it contains all 4 required attributes (id, source, type & specversion) - * - * @return true if this instance represents a valid Cloud Event - */ - public boolean isValidCloudEvent() { - return StringUtils.hasText(this.getId()) - && StringUtils.hasText(this.getSource()) - && StringUtils.hasText(this.getSpecversion()) - && StringUtils.hasText(this.getType()); - } - - String getAttributeName(String attributeName) { - if (this.containsKey(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attributeName)) { - return CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attributeName; - } - else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attributeName)) { - return CloudEventMessageUtils.HTTP_ATTR_PREFIX + attributeName; - } - return attributeName; - } - - private CloudEventAttributes setAttribute(String attrName, String attrValue) { - if (StringUtils.hasText(this.prefixToUse)) { - this.remove(this.getAttributeName(attrName)); - this.put(this.prefixToUse + attrName, attrValue); - } - else { - this.put(this.getAttributeName(attrName), attrValue); - } - return this; - } -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java deleted file mode 100644 index 721dd91d2..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2020-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.cloudevent; - - -/** - * Strategy that should be implemented by the user to help with outgoing Cloud Event attributes. - *

- * The provided `attributes` are already initialized with default values, so you can only set the ones that you need. - *
- * Once implemented, simply configure it as a bean and the framework will invoke it before the outbound Cloud Event Message is finalized. - * - *
{@code
- * @Bean
- * public CloudEventAttributesProvider cloudEventAttributesProvider() {
- * 	return attributes -> {
- *		attributes.setSource("https://interface21.com/").setType("com.interface21");
- *	};
- * }}
- * 
- * - * @author Oleg Zhurakousky - * @author Dave Syer - * - * @since 3.1 - */ -@FunctionalInterface -public interface CloudEventAttributesProvider { - /** - * - * @param attributes instance of {@link CloudEventAttributes} - */ - void generateDefaultCloudEventHeaders(CloudEventAttributes attributes); -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventHeaderEnricher.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventHeaderEnricher.java new file mode 100644 index 000000000..485fef3a1 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventHeaderEnricher.java @@ -0,0 +1,54 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.cloudevent; + + + +/** + * Strategy that should be implemented by the user to help with outgoing Cloud Event + * headers.
+ *
+ * NOTE: The provided instance of {@link CloudEventMessageBuilder} may or may not be initialized + * with default values, so it is the responsibility of the user to ensure that all required Cloud Events + * attributes are set. That said, Spring frameworks which utilize this interface + * will ensure that the provided {@link CloudEventMessageBuilder} is initialized with default values, leaving + * you responsible to only set the attributes you need.
+ * Once implemented, simply configure it as a bean and the framework will invoke it before + * the outbound Cloud Event Message is finalized. + * + *
+ * @Bean
+ * public CloudEventHeadersProvider cloudEventHeadersProvider() {
+ * 	return attributes ->
+ *		CloudEventHeaderUtils.fromAttributes(attributes).withSource("https://interface21.com/").withType("com.interface21").build();
+ * }
+ * 
+ * + * @author Oleg Zhurakousky + * @author Dave Syer + * @since 2.0 + */ +@FunctionalInterface +public interface CloudEventHeaderEnricher { + + /** + * @param attributes instance of {@link CloudEventContext} + * @return modified {@link CloudEventContext} + */ + CloudEventMessageBuilder enrich(CloudEventMessageBuilder messageBuilder); + +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java new file mode 100644 index 000000000..0d2fba37d --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java @@ -0,0 +1,196 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.cloudevent; + +import java.net.URI; +import java.time.OffsetTime; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.GenericMessage; + +/** + * Message builder which is aware of Cloud Event semantics. + * + * @author Oleg Zhurakousky + * @since 3.1 + */ +public final class CloudEventMessageBuilder { + + protected Log logger = LogFactory.getLog(this.getClass()); + + private final Map headers; + + private T data; + + private CloudEventMessageBuilder(Map headers) { + this.headers = headers == null ? new HashMap<>() : headers; + } + + public static CloudEventMessageBuilder withData(T data) { + CloudEventMessageBuilder builder = new CloudEventMessageBuilder(null); + builder.data = data; + return builder; + } + + @SuppressWarnings("unchecked") + public static CloudEventMessageBuilder fromMessage(Message message) { + CloudEventMessageBuilder builder = new CloudEventMessageBuilder(new HashMap<>(message.getHeaders())); + builder.data = (T) message.getPayload(); + return builder; + } + + public CloudEventMessageBuilder setId(String id) { + this.headers.put(CloudEventMessageUtils.ID, id); + return this; + } + + public CloudEventMessageBuilder setSource(URI uri) { + this.headers.put(CloudEventMessageUtils.SOURCE, uri); + return this; + } + + public CloudEventMessageBuilder setSource(String uri) { + this.headers.put(CloudEventMessageUtils.SOURCE, URI.create(uri)); + return this; + } + + public CloudEventMessageBuilder setSpecVersion(String specversion) { + this.headers.put(CloudEventMessageUtils.SPECVERSION, specversion); + return this; + } + + public CloudEventMessageBuilder setType(String type) { + this.headers.put(CloudEventMessageUtils.TYPE, type); + return this; + } + + public CloudEventMessageBuilder setDataContentType(String dataContentType) { + this.headers.put(CloudEventMessageUtils.DATACONTENTTYPE, dataContentType); + return this; + } + + public CloudEventMessageBuilder setDataSchema(URI dataSchema) { + this.headers.put(CloudEventMessageUtils.DATASCHEMA, dataSchema); + return this; + } + + public CloudEventMessageBuilder setDataSchema(String dataSchema) { + this.headers.put(CloudEventMessageUtils.DATASCHEMA, URI.create(dataSchema)); + return this; + } + + public CloudEventMessageBuilder setSubject(String subject) { + this.headers.put(CloudEventMessageUtils.SUBJECT, subject); + return this; + } + + public CloudEventMessageBuilder copyHeaders(Map headers) { + this.headers.putAll(headers); + return this; + } + + public CloudEventMessageBuilder setTime(OffsetTime time) { + this.headers.put(CloudEventMessageUtils.TIME, time); + return this; + } + + public CloudEventMessageBuilder setTime(String time) { + this.headers.put(CloudEventMessageUtils.TIME, OffsetTime.parse(time)); + return this; + } + + public CloudEventMessageBuilder setHeader(String key, Object value) { + this.headers.put(key, value); + return this; + } + + /** + * Returns a snapshot of the headers {@link Map} at the time this method is called. + * The returned Map is read-only. + * + * @return map of headers + */ + public Map toHeadersMap() { + return Collections.unmodifiableMap(this.headers); + } + + public Message build() { + if (!this.headers.containsKey(CloudEventMessageUtils.SPECVERSION)) { + this.headers.put(CloudEventMessageUtils.SPECVERSION, "1.0"); + } + return this.doBuild(); + } + + public Message build(String attributePrefixToUse) { + String[] keys = this.headers.keySet().toArray(new String[] {}); + for (String key : keys) { + Object value = this.headers.remove(key); + this.headers.put(attributePrefixToUse + key, value); + } + if (!this.headers.containsKey(attributePrefixToUse + CloudEventMessageUtils.SPECVERSION)) { + this.headers.put(attributePrefixToUse + CloudEventMessageUtils.SPECVERSION, "1.0"); + } + return build(); + } + + private Message doBuild() { + this.headers.put("message-type", "cloudevent"); + CloudEventMessageHeaders headers = new CloudEventMessageHeaders(this.headers, this.getUUID(), null); + GenericMessage message = new GenericMessage(data, headers); + return message; + } + + private UUID getUUID() { + UUID id = null; + if (this.headers.containsKey(CloudEventMessageUtils.ID)) { + String stringId = this.headers.get(CloudEventMessageUtils.ID).toString(); + try { + id = UUID.fromString(stringId); + System.out.println(stringId); + System.out.println(id.toString()); + } + catch (Exception e) { + logger.info("Provided Cloud Event 'id' is not compatible with Message 'id' which is UUID, " + + "therefore Cloud Event 'id' will be written as '_id' message header"); + this.headers.put("_" + CloudEventMessageUtils.ID, stringId); + this.headers.remove(CloudEventMessageUtils.ID); + } + } + return id; + } + + private static class CloudEventMessageHeaders extends MessageHeaders { + + /** + * + */ + private static final long serialVersionUID = -6424866731588545945L; + + protected CloudEventMessageHeaders(Map headers, UUID id, Long timestamp) { + super(headers, id, timestamp); + } + + } +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index 36843c4d3..e45cb1f85 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -16,10 +16,12 @@ package org.springframework.cloud.function.cloudevent; -import java.util.HashMap; +import java.lang.reflect.Field; +import java.net.URI; +import java.time.OffsetTime; +import java.util.Collections; import java.util.Map; import java.util.Set; -import java.util.UUID; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; @@ -27,19 +29,19 @@ import org.springframework.messaging.converter.ContentTypeResolver; import org.springframework.messaging.converter.DefaultContentTypeResolver; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.support.MessageBuilder; -import org.springframework.util.Assert; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; +import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; /** - * Miscellaneous utility methods to deal with Cloud Events - https://cloudevents.io/. - *
- * Primarily intended for the internal use within the framework; + * Miscellaneous utility methods to assist with representing Cloud Event as Spring + * {@link Message}.
+ * Primarily intended for the internal use within Spring-based frameworks and + * integrations. * * @author Oleg Zhurakousky * @author Dave Syer - * * @since 3.1 */ public final class CloudEventMessageUtils { @@ -61,280 +63,254 @@ public final class CloudEventMessageUtils { public static MimeType APPLICATION_CLOUDEVENTS = MimeTypeUtils.parseMimeType(APPLICATION_CLOUDEVENTS_VALUE); /** - * Default attributes prefix which also suits Kafka. + * Prefix for attributes. */ public static String DEFAULT_ATTR_PREFIX = "ce_"; - /** - * HTTP attributes prefix. - */ - public static String HTTP_ATTR_PREFIX = "ce-"; - /** * AMQP attributes prefix. */ public static String AMQP_ATTR_PREFIX = "cloudEvents:"; + /** + * Prefix for attributes. + */ + public static String HTTP_ATTR_PREFIX = "ce-"; + /** * Value for 'data' attribute. */ public static String DATA = "data"; - /** - * Value for 'data' attribute with prefix. - */ - public static String CANONICAL_DATA = DEFAULT_ATTR_PREFIX + DATA; - /** * Value for 'id' attribute. */ public static String ID = "id"; - /** - * Value for 'id' attribute with prefix. - */ - public static String CANONICAL_ID = DEFAULT_ATTR_PREFIX + ID; - /** * Value for 'source' attribute. */ public static String SOURCE = "source"; - /** - * Value for 'source' attribute with prefix. - */ - public static String CANONICAL_SOURCE = DEFAULT_ATTR_PREFIX + SOURCE; - /** * Value for 'specversion' attribute. */ public static String SPECVERSION = "specversion"; - /** - * Value for 'specversion' attribute with prefix. - */ - public static String CANONICAL_SPECVERSION = DEFAULT_ATTR_PREFIX + SPECVERSION; - /** * Value for 'type' attribute. */ public static String TYPE = "type"; - /** - * Value for 'type' attribute with prefix. - */ - public static String CANONICAL_TYPE = DEFAULT_ATTR_PREFIX + TYPE; - /** * Value for 'datacontenttype' attribute. */ public static String DATACONTENTTYPE = "datacontenttype"; - /** - * Value for 'datacontenttype' attribute with prefix. - */ - public static String CANONICAL_DATACONTENTTYPE = DEFAULT_ATTR_PREFIX + DATACONTENTTYPE; - /** * Value for 'dataschema' attribute. */ public static String DATASCHEMA = "dataschema"; /** - * Value for 'dataschema' attribute with prefix. + * V03 name for 'dataschema' attribute. */ - public static String CANONICAL_DATASCHEMA = DEFAULT_ATTR_PREFIX + DATASCHEMA; + public static final String SCHEMAURL = "schemaurl"; /** * Value for 'subject' attribute. */ public static String SUBJECT = "subject"; - /** - * Value for 'subject' attribute with prefix. - */ - public static String CANONICAL_SUBJECT = DEFAULT_ATTR_PREFIX + SUBJECT; - /** * Value for 'time' attribute. */ public static String TIME = "time"; - /** - * Value for 'time' attribute with prefix. - */ - public static String CANONICAL_TIME = DEFAULT_ATTR_PREFIX + TIME; - - /** - * Checks if {@link Message} represents cloud event in binary-mode. - */ - public static boolean isBinary(Map headers) { - CloudEventAttributes attributes = new CloudEventAttributes(headers); - return attributes.isValidCloudEvent(); + public static String getId(Message message) { + if (message.getHeaders().containsKey("_id")) { + return (String) message.getHeaders().get("_id"); + } + String prefix = determinePrefixToUse(message.getHeaders()); + return (String) message.getHeaders().get(prefix + MessageHeaders.ID); } - /** - * Will construct instance of {@link CloudEventAttributes} setting its required attributes. - * - * @param ce_id value for Cloud Event 'id' attribute - * @param ce_specversion value for Cloud Event 'specversion' attribute - * @param ce_source value for Cloud Event 'source' attribute - * @param ce_type value for Cloud Event 'type' attribute - * @return instance of {@link CloudEventAttributes} - */ - public static CloudEventAttributes get(String ce_id, String ce_specversion, String ce_source, String ce_type) { - Assert.hasText(ce_id, "'ce_id' must not be null or empty"); - Assert.hasText(ce_specversion, "'ce_specversion' must not be null or empty"); - Assert.hasText(ce_source, "'ce_source' must not be null or empty"); - Assert.hasText(ce_type, "'ce_type' must not be null or empty"); - Map requiredAttributes = new HashMap<>(); - requiredAttributes.put(CloudEventMessageUtils.CANONICAL_ID, ce_id); - requiredAttributes.put(CloudEventMessageUtils.CANONICAL_SPECVERSION, ce_specversion); - requiredAttributes.put(CloudEventMessageUtils.CANONICAL_SOURCE, ce_source); - requiredAttributes.put(CloudEventMessageUtils.CANONICAL_TYPE, ce_type); - return new CloudEventAttributes(requiredAttributes); + public static URI getSource(Message message) { + String prefix = determinePrefixToUse(message.getHeaders()); + return safeGetURI(message.getHeaders(), prefix + SOURCE); } - /** - * Will construct instance of {@link CloudEventAttributes} - * Should default/generate cloud event ID and SPECVERSION. - * - * @param ce_source value for Cloud Event 'source' attribute - * @param ce_type value for Cloud Event 'type' attribute - * @return instance of {@link CloudEventAttributes} - */ - public static CloudEventAttributes get(String ce_source, String ce_type) { - return get(UUID.randomUUID().toString(), "1.0", ce_source, ce_type); + public static String getSpecVersion(Message message) { + String prefix = determinePrefixToUse(message.getHeaders()); + return (String) message.getHeaders().get(prefix + SPECVERSION); + } + + public static String getType(Message message) { + String prefix = determinePrefixToUse(message.getHeaders()); + return (String) message.getHeaders().get(prefix + TYPE); + } + + public static String getDataContentType(Message message) { + String prefix = determinePrefixToUse(message.getHeaders()); + return (String) message.getHeaders().get(prefix + DATACONTENTTYPE); + } + + public static URI getDataSchema(Message message) { + String prefix = determinePrefixToUse(message.getHeaders()); + return safeGetURI(message.getHeaders(), prefix + DATASCHEMA); + } + + public static String getSubject(Message message) { + String prefix = determinePrefixToUse(message.getHeaders()); + return (String) message.getHeaders().get(prefix + SUBJECT); + } + + public static OffsetTime getTime(Message message) { + String prefix = determinePrefixToUse(message.getHeaders()); + return (OffsetTime) message.getHeaders().get(prefix + TIME); } - /** - * Will attempt to convert 'inputMessage' to a binary-mode Cloud Event {@link Message}. - * This typically happens when 'inputMessage' represents Cloud Event in structured-mode. - *
- * In the event the message already represents Cloud Event in binary-mode, or this - * message does not represent Cloud Event at all, it will be returned unchanged. - * - * @param inputMessage instance of incoming {@link Message} - * @param messageConverter instance of {@link MessageConverter} to assist with type conversion. - * @return instance of {@link Message} representing Cloud Event in binary-mode or unchanged 'inputMessage'. - */ @SuppressWarnings("unchecked") - public static Message toBinary(Message inputMessage, MessageConverter messageConverter) { + protected static Message toCannonical(Message inputMessage, MessageConverter messageConverter) { - Map headers = inputMessage.getHeaders(); - CloudEventAttributes attributes = new CloudEventAttributes(headers); + Field headersField = ReflectionUtils.findField(MessageHeaders.class, "headers"); + headersField.setAccessible(true); + Map headers = (Map) ReflectionUtils.getField(headersField, inputMessage.getHeaders()); + canonicalizeHeaders(headers); + String inputContentType = (String) inputMessage.getHeaders().get(DATACONTENTTYPE); // first check the obvious and see if content-type is `cloudevents` - if (!attributes.isValidCloudEvent() && headers.containsKey(MessageHeaders.CONTENT_TYPE)) { - MimeType contentType = resolveContentType(inputMessage.getHeaders()); - if (contentType != null && contentType.getType().equals(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getType()) - && contentType.getSubtype().startsWith(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getSubtype())) { + if (!isBinary(inputMessage) && headers.containsKey(MessageHeaders.CONTENT_TYPE)) { + MimeType contentType = contentTypeResolver.resolve(inputMessage.getHeaders()); + if (contentType.getType().equals(APPLICATION_CLOUDEVENTS.getType()) && contentType + .getSubtype().startsWith(APPLICATION_CLOUDEVENTS.getSubtype())) { - String dataContentType = StringUtils.hasText(attributes.getDataContentType()) - ? attributes.getDataContentType() + String dataContentType = StringUtils.hasText(inputContentType) ? inputContentType : MimeTypeUtils.APPLICATION_JSON_VALUE; String suffix = contentType.getSubtypeSuffix(); - Assert.hasText(suffix, "Content-type 'suffix' can not be determined from " + contentType); MimeType cloudEventDeserializationContentType = MimeTypeUtils .parseMimeType(contentType.getType() + "/" + suffix); Message cloudEventMessage = MessageBuilder.fromMessage(inputMessage) .setHeader(MessageHeaders.CONTENT_TYPE, cloudEventDeserializationContentType) - .setHeader(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE, dataContentType) - .build(); - Map structuredCloudEvent = (Map) messageConverter.fromMessage(cloudEventMessage, Map.class); - Message binaryCeMessage = buildCeMessageFromStructured(structuredCloudEvent, inputMessage.getHeaders()); + .setHeader(DATACONTENTTYPE, dataContentType).build(); + Map structuredCloudEvent = (Map) messageConverter + .fromMessage(cloudEventMessage, Map.class); + + canonicalizeHeaders(structuredCloudEvent); + Message binaryCeMessage = buildBinaryMessageFromStructuredMap(structuredCloudEvent, + inputMessage.getHeaders()); + return binaryCeMessage; } } - else if (StringUtils.hasText(attributes.getDataContentType())) { - return MessageBuilder.fromMessage(inputMessage) - .setHeader(MessageHeaders.CONTENT_TYPE, attributes.getDataContentType()) - .build(); + else if (StringUtils.hasText(inputContentType)) { // this needs thinking since . . + return MessageBuilder.fromMessage(inputMessage).setHeader(MessageHeaders.CONTENT_TYPE, inputContentType) + .build(); } return inputMessage; } - private static MimeType resolveContentType(MessageHeaders headers) { - try { - return contentTypeResolver.resolve(headers); - } - catch (Exception e) { - // ignore - } - return null; - } /** - * Will attempt to determine based on the headers the origin of Message (e.g., HTTP, Kafka etc) - * and based on this designate prefix to be used for Cloud Events attributes (i.e., `ce-` or `ce_` etc). + * Determines attribute prefix based on the presence of certain well defined headers. * - * @param messageHeaders instance of {@link MessageHeaders} - * @return prefix to be used for Cloud Events attributes + * TODO work in progress as it needs to be refined + * + * @param messageHeaders map of message headers + * @return prefix (e.g., 'ce_' or 'ce-' etc.) */ - public static String determinePrefixToUse(MessageHeaders messageHeaders) { + protected static String determinePrefixToUse(Map messageHeaders) { Set keys = messageHeaders.keySet(); if (keys.contains("user-agent")) { - return CloudEventMessageUtils.HTTP_ATTR_PREFIX; - } - else if (keys.contains("amqp")) { - return CloudEventMessageUtils.AMQP_ATTR_PREFIX; + return HTTP_ATTR_PREFIX; } else { - return CloudEventMessageUtils.DEFAULT_ATTR_PREFIX; // default which also suits Kafka 'ce_' + for (String key : messageHeaders.keySet()) { + if (key.startsWith("kafka_")) { + return DEFAULT_ATTR_PREFIX; + } + else if (key.startsWith("amqp_")) { + return AMQP_ATTR_PREFIX; + } + else if (key.startsWith(DEFAULT_ATTR_PREFIX)) { + return DEFAULT_ATTR_PREFIX; + } + else if (key.startsWith(HTTP_ATTR_PREFIX)) { + return HTTP_ATTR_PREFIX; + } + else if (key.startsWith(AMQP_ATTR_PREFIX)) { + return AMQP_ATTR_PREFIX; + } + } } + + return ""; } /** - * Typically called by Consumer. - + * Will check for the existence of required attributes. Assumes attributes (headers) + * are in canonical form. + * @param message input {@link Message} + * @return true if this Message represents Cloud Event in binary-mode */ - public static CloudEventAttributes generateAttributes(Message message, CloudEventAttributesProvider provider) { - CloudEventAttributes attributes = generateDefaultAttributeValues(new CloudEventAttributes(message.getHeaders()), - message.getPayload().getClass().getName().getClass().getName(), message.getPayload().getClass().getName().getClass().getName()); - provider.generateDefaultCloudEventHeaders(attributes); - return attributes; + protected static boolean isBinary(Message message) { + return message.getHeaders().containsKey(SPECVERSION) + && message.getHeaders().containsKey(TYPE) + && message.getHeaders().containsKey(SOURCE); } - public static CloudEventAttributes generateAttributes(Message inputMessage, String typeName, String sourceName) { - CloudEventAttributes attributes = new CloudEventAttributes(inputMessage.getHeaders(), CloudEventMessageUtils.determinePrefixToUse(inputMessage.getHeaders())); - return generateDefaultAttributeValues(attributes, sourceName, typeName); + /** + * Will canonicalize Cloud Event attributes (headers) by removing well known prefixes. + * So, for example 'ce_source' will become 'source'. + * @param headers message headers + */ + private static void canonicalizeHeaders(Map headers) { + String[] keys = headers.keySet().toArray(new String[] {}); + for (String key : keys) { + if (key.startsWith(HTTP_ATTR_PREFIX)) { + Object value = headers.remove(key); + key = key.substring(HTTP_ATTR_PREFIX.length()); + headers.put(key, value); + } + else if (key.startsWith(DEFAULT_ATTR_PREFIX)) { + Object value = headers.remove(key); + key = key.substring(DEFAULT_ATTR_PREFIX.length()); + headers.put(key, value); + } + else if (key.startsWith(AMQP_ATTR_PREFIX)) { + Object value = headers.remove(key); + key = key.substring(AMQP_ATTR_PREFIX.length()); + headers.put(key, value); + } + } } - private static Message buildCeMessageFromStructured(Map structuredCloudEvent, MessageHeaders originalHeaders) { - String prefixToUse = determinePrefixToUse(originalHeaders); - Object data = null; - if (structuredCloudEvent.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA)) { - data = structuredCloudEvent.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA); - structuredCloudEvent.remove(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA); + private static Message buildBinaryMessageFromStructuredMap(Map structuredCloudEvent, + MessageHeaders originalHeaders) { + Object payload = structuredCloudEvent.remove(DATA); + if (payload == null) { + payload = Collections.emptyMap(); } - else if (structuredCloudEvent.containsKey(CloudEventMessageUtils.CANONICAL_DATA)) { - data = structuredCloudEvent.get(CloudEventMessageUtils.CANONICAL_DATA); - structuredCloudEvent.remove(CloudEventMessageUtils.CANONICAL_DATA); + + CloudEventMessageBuilder messageBuilder = CloudEventMessageBuilder + .withData(payload) + .copyHeaders(structuredCloudEvent); + + for (String key : originalHeaders.keySet()) { + if (!MessageHeaders.ID.equals(key)) { + messageBuilder.setHeader(key, originalHeaders.get(key)); + } } - else if (structuredCloudEvent.containsKey(CloudEventMessageUtils.DATA)) { - data = structuredCloudEvent.get(CloudEventMessageUtils.DATA); - structuredCloudEvent.remove(CloudEventMessageUtils.DATA); - } - Assert.notNull(data, "'data' must not be null"); - MessageBuilder builder = MessageBuilder.withPayload(data); - CloudEventAttributes attributes = new CloudEventAttributes(structuredCloudEvent); - builder.setHeader(prefixToUse + CloudEventMessageUtils.ID, attributes.getId()); - builder.setHeader(prefixToUse + CloudEventMessageUtils.SOURCE, attributes.getSource()); - builder.setHeader(prefixToUse + CloudEventMessageUtils.TYPE, attributes.getType()); - builder.setHeader(prefixToUse + CloudEventMessageUtils.SPECVERSION, attributes.getSpecversion()); - builder.copyHeaders(originalHeaders); - return builder.build(); + + return messageBuilder.build(); } - private static CloudEventAttributes generateDefaultAttributeValues(CloudEventAttributes attributes, String source, String type) { - if (attributes.isValidCloudEvent()) { - return attributes - .setSpecversion("1.0") - .setId(UUID.randomUUID().toString()) - .setType(type) - .setSource(source); + private static URI safeGetURI(Map map, String key) { + Object uri = map.get(key); + if (uri != null && uri instanceof String) { + uri = URI.create((String) uri); } - return attributes; + return (URI) uri; } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionExtensionConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionExtensionConfiguration.java new file mode 100644 index 000000000..f595694ad --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionExtensionConfiguration.java @@ -0,0 +1,45 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.cloudevent; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.lang.Nullable; + +/** + * + * @author Oleg Zhurakousky + * @since 3.1 + */ +@Configuration(proxyBeanMethods = false) +class CloudEventsFunctionExtensionConfiguration { + + @Bean + @ConditionalOnMissingClass("io.cloudevents.CloudEvent") + public CloudEventsFunctionInvocationHelper nativeFunctionInvocationHelper(@Nullable CloudEventHeaderEnricher cloudEventHeadersProvider) { + return new CloudEventsFunctionInvocationHelper(cloudEventHeadersProvider); + } + + @Bean + @ConditionalOnClass(name = "io.cloudevents.CloudEvent") + public CloudEventsFunctionInvocationHelper sdkFunctionInvocationHelper() { + // TODO you may need SDKs header provider + return null; + } +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java new file mode 100644 index 000000000..c45c010bf --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java @@ -0,0 +1,101 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.cloudevent; + +import java.net.URI; +import java.util.UUID; + +import org.springframework.beans.BeansException; +import org.springframework.cloud.function.core.FunctionInvocationHelper; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.StringUtils; + +/** + * + * @author Oleg Zhurakousky + * @since 2.0 + * + */ +class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper>, ApplicationContextAware { + + private ConfigurableApplicationContext applicationContext; + + private final CloudEventHeaderEnricher cloudEventAttributesProvider; + + CloudEventsFunctionInvocationHelper(@Nullable CloudEventHeaderEnricher cloudEventHeadersProvider) { + this.cloudEventAttributesProvider = cloudEventHeadersProvider; + } + + @Override + public boolean isRetainOuputAsMessage(Message message) { + if (message.getHeaders().containsKey("message-type") && message.getHeaders().get("message-type").equals("cloudevent")) { + return true; + } + return false; + } + + @Override + public Message preProcessInput(Message input, Object inputConverter) { + return CloudEventMessageUtils.toCannonical(input, (MessageConverter) inputConverter); + } + + @Override + public Message postProcessResult(Message input, Object result) { + Message resultMessage = null; + if (CloudEventMessageUtils.isBinary(input)) { + CloudEventMessageBuilder messageBuilder = CloudEventMessageBuilder + .withData(result) + .setId(UUID.randomUUID().toString()) + .setSource(URI.create("http://spring.io/" + getApplicationName())) + .setType(result.getClass().getName()); + + if (this.cloudEventAttributesProvider != null) { + messageBuilder = this.cloudEventAttributesProvider.enrich(messageBuilder); + } + + String prefix = this.determineOutputPrefix(input); + resultMessage = messageBuilder.build(prefix); + } + else { + resultMessage = MessageBuilder.withPayload(result).build(); + } + return resultMessage; + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = (ConfigurableApplicationContext) applicationContext; + } + + private String determineOutputPrefix(Message input) { + //TODO rework to actually figure out where output goes instead of relying on input + return CloudEventMessageUtils.determinePrefixToUse(input.getHeaders()); + } + + private String getApplicationName() { + ConfigurableEnvironment environment = this.applicationContext.getEnvironment(); + String name = environment.getProperty("spring.application.name"); + return (StringUtils.hasText(name) ? name : "application-" + this.applicationContext.getId()); + } +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java index 60261c1a1..100294f83 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java @@ -20,7 +20,6 @@ import java.lang.reflect.Method; import java.lang.reflect.Type; import java.util.Arrays; import java.util.Set; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -32,26 +31,22 @@ import org.springframework.aop.framework.ProxyFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils; -import org.springframework.cloud.function.cloudevent.CloudEventAttributes; -import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider; -import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionRegistry; +import org.springframework.cloud.function.core.FunctionInvocationHelper; import org.springframework.cloud.function.json.JsonMapper; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.support.GenericApplicationContext; import org.springframework.core.convert.ConversionService; -import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.converter.CompositeMessageConverter; -import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.StringUtils; /** - * Implementation of {@link FunctionRegistry} capable of discovering functioins in - * {@link BeanFactory}. + * Implementation of {@link FunctionRegistry} capable of discovering functioins in {@link BeanFactory}. * * @author Oleg Zhurakousky */ @@ -59,19 +54,14 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp private GenericApplicationContext applicationContext; - private CloudEventAttributesProvider cloudEventAtttributesProvider; - - public BeanFactoryAwareFunctionRegistry(ConversionService conversionService, - CompositeMessageConverter messageConverter, JsonMapper jsonMapper) { - super(conversionService, messageConverter, jsonMapper); + public BeanFactoryAwareFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter, + JsonMapper jsonMapper, @Nullable FunctionInvocationHelper> functionInvocationHelper) { + super(conversionService, messageConverter, jsonMapper, functionInvocationHelper); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = (GenericApplicationContext) applicationContext; - if (applicationContext.getBeanNamesForType(CloudEventAttributesProvider.class).length > 0) { - this.cloudEventAtttributesProvider = applicationContext.getBean(CloudEventAttributesProvider.class); - } } /* @@ -80,9 +70,10 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp */ @Override public int size() { - return this.applicationContext.getBeanNamesForType(Supplier.class).length - + this.applicationContext.getBeanNamesForType(Function.class).length - + this.applicationContext.getBeanNamesForType(Consumer.class).length + super.size(); + return this.applicationContext.getBeanNamesForType(Supplier.class).length + + this.applicationContext.getBeanNamesForType(Function.class).length + + this.applicationContext.getBeanNamesForType(Consumer.class).length + + super.size(); } /* @@ -92,9 +83,12 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp public Set getNames(Class type) { Set registeredNames = super.getNames(type); if (type == null) { - registeredNames.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Function.class))); - registeredNames.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Supplier.class))); - registeredNames.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Consumer.class))); + registeredNames + .addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Function.class))); + registeredNames + .addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Supplier.class))); + registeredNames + .addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Consumer.class))); } else { registeredNames.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(type))); @@ -105,8 +99,9 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public T lookup(Class type, String functionDefinition, String... expectedOutputMimeTypes) { - functionDefinition = StringUtils.hasText(functionDefinition) ? functionDefinition - : this.applicationContext.getEnvironment().getProperty(FunctionProperties.FUNCTION_DEFINITION, ""); + functionDefinition = StringUtils.hasText(functionDefinition) + ? functionDefinition + : this.applicationContext.getEnvironment().getProperty(FunctionProperties.FUNCTION_DEFINITION, ""); functionDefinition = this.normalizeFunctionDefinition(functionDefinition); if (!StringUtils.hasText(functionDefinition)) { @@ -117,8 +112,7 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp if (function == null) { Set functionRegistratioinNames = super.getNames(null); - String[] functionNames = StringUtils - .delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), "|"); + String[] functionNames = StringUtils.delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), "|"); for (String functionName : functionNames) { if (functionRegistratioinNames.contains(functionName)) { logger.info("Skipping function '" + functionName + "' since it is already present"); @@ -132,31 +126,26 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp functionRegistration = (FunctionRegistration) functionCandidate; } else if (this.isFunctionPojo(functionCandidate, functionName)) { - Method functionalMethod = FunctionTypeUtils - .discoverFunctionalMethod(functionCandidate.getClass()); + Method functionalMethod = FunctionTypeUtils.discoverFunctionalMethod(functionCandidate.getClass()); functionCandidate = this.proxyTarget(functionCandidate, functionalMethod); functionType = FunctionTypeUtils.fromFunctionMethod(functionalMethod); } else if (this.isSpecialFunctionRegistration(functionNames, functionName)) { - functionRegistration = this.applicationContext.getBean( - functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX, - FunctionRegistration.class); + functionRegistration = this.applicationContext + .getBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX, FunctionRegistration.class); } else { - functionType = FunctionTypeUtils.discoverFunctionType(functionCandidate, functionName, - this.applicationContext); + functionType = FunctionTypeUtils.discoverFunctionType(functionCandidate, functionName, this.applicationContext); } if (functionRegistration == null) { - functionRegistration = new FunctionRegistration(functionCandidate, functionName) - .type(functionType); + functionRegistration = new FunctionRegistration(functionCandidate, functionName).type(functionType); } this.register(functionRegistration); } else { if (logger.isDebugEnabled()) { - logger.debug("Function '" + functionName - + "' is not available in FunctionCatalog or BeanFactory"); + logger.debug("Function '" + functionName + "' is not available in FunctionCatalog or BeanFactory"); } } } @@ -164,36 +153,9 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp function = super.doLookup(type, functionDefinition, expectedOutputMimeTypes); } - if (function != null) { - BiFunction, Object, Message> invocationResultHeaderEnricher = new BiFunction, Object, Message>() { - @Override - public Message apply(Message inputMessage, Object invocationResult) { - // TODO: Factor it out! Cloud Events specific code - CloudEventAttributes generatedCeHeaders = CloudEventMessageUtils.generateAttributes(inputMessage, - invocationResult.getClass().getName(), getApplicationName()); - CloudEventAttributes attributes = new CloudEventAttributes(generatedCeHeaders, - CloudEventMessageUtils.determinePrefixToUse(inputMessage.getHeaders())); - if (cloudEventAtttributesProvider != null) { - cloudEventAtttributesProvider.generateDefaultCloudEventHeaders(attributes); - } - Message message = MessageBuilder.withPayload(invocationResult).copyHeaders(attributes).build(); - - return message; - } - }; - function.setOutputMessageHeaderEnricher(invocationResultHeaderEnricher); - } - return (T) function; } - private String getApplicationName() { - ConfigurableEnvironment environment = this.applicationContext.getEnvironment(); - String name = environment.getProperty("spring.application.name"); - return "http://spring.io/" - + (StringUtils.hasText(name) ? name : "application-" + this.applicationContext.getId()); - } - private Object discoverFunctionInBeanFactory(String functionName) { Object functionCandidate = null; if (this.applicationContext.containsBean(functionName)) { @@ -201,8 +163,7 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp } else { try { - functionCandidate = BeanFactoryAnnotationUtils - .qualifiedBeanOfType(this.applicationContext.getBeanFactory(), Object.class, functionName); + functionCandidate = BeanFactoryAnnotationUtils.qualifiedBeanOfType(this.applicationContext.getBeanFactory(), Object.class, functionName); } catch (Exception e) { // ignore since there is no safe isAvailable-kind of method @@ -217,19 +178,21 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp } private boolean isFunctionPojo(Object functionCandidate, String functionName) { - return !functionCandidate.getClass().isSynthetic() && !(functionCandidate instanceof Supplier) - && !(functionCandidate instanceof Function) && !(functionCandidate instanceof Consumer) - && !this.applicationContext.containsBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX); + return !functionCandidate.getClass().isSynthetic() + && !(functionCandidate instanceof Supplier) + && !(functionCandidate instanceof Function) + && !(functionCandidate instanceof Consumer) + && !this.applicationContext.containsBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX); } /** - * At the moment 'special function registration' simply implies that a bean under the - * provided functionName may have already been wrapped and registered as - * FunuctionRegistration with BeanFactory under the name of the function suffixed with - * {@link FunctionRegistration#REGISTRATION_NAME_SUFFIX} (e.g., - * 'myKotlinFunction_registration').
- *
+ * At the moment 'special function registration' simply implies that a bean under the provided functionName + * may have already been wrapped and registered as FunuctionRegistration with BeanFactory under the name of + * the function suffixed with {@link FunctionRegistration#REGISTRATION_NAME_SUFFIX} + * (e.g., 'myKotlinFunction_registration'). + *

* At the moment only Kotlin module does this + * * @param functionCandidate candidate for FunctionInvocationWrapper instance * @param functionName the name of the function * @return true if this function candidate qualifies @@ -250,5 +213,4 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp }); return pf.getProxy(); } - } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index fc2b56d8b..b801e6a8d 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.TreeSet; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -46,13 +45,12 @@ import reactor.util.function.Tuples; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.function.cloudevent.CloudEventAttributes; -import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionRegistry; import org.springframework.cloud.function.context.config.RoutingFunction; +import org.springframework.cloud.function.core.FunctionInvocationHelper; import org.springframework.cloud.function.json.JsonMapper; import org.springframework.core.ResolvableType; import org.springframework.core.convert.ConversionService; @@ -98,10 +96,13 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect private final JsonMapper jsonMapper; + private final FunctionInvocationHelper> functionInvocationHelper; + @Autowired(required = false) private FunctionAroundWrapper functionAroundWrapper; - public SimpleFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter, JsonMapper jsonMapper) { + public SimpleFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter, JsonMapper jsonMapper, + @Nullable FunctionInvocationHelper> functionInvocationHelper) { Assert.notNull(messageConverter, "'messageConverter' must not be null"); Assert.notNull(jsonMapper, "'jsonMapper' must not be null"); this.conversionService = conversionService; @@ -109,6 +110,11 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect this.messageConverter = messageConverter; this.headersField = ReflectionUtils.findField(MessageHeaders.class, "headers"); this.headersField.setAccessible(true); + this.functionInvocationHelper = functionInvocationHelper; + } + + public SimpleFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter, JsonMapper jsonMapper) { + this(conversionService, messageConverter, jsonMapper, null); } @SuppressWarnings("unchecked") @@ -322,11 +328,11 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect */ private Function enhancer; - private BiFunction, Object, Message> outputMessageHeaderEnricher; - - void setOutputMessageHeaderEnricher(BiFunction, Object, Message> outputMessageHeaderEnricher) { - this.outputMessageHeaderEnricher = outputMessageHeaderEnricher; - } +// private BiFunction, Object, Message> outputMessageHeaderEnricher; +// +// void setOutputMessageHeaderEnricher(BiFunction, Object, Message> outputMessageHeaderEnricher) { +// this.outputMessageHeaderEnricher = outputMessageHeaderEnricher; +// } FunctionInvocationWrapper(FunctionInvocationWrapper function) { this.target = function.target; @@ -623,8 +629,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect this.sanitizeHeaders(((Message) input).getHeaders()).forEach((k, v) -> headersMap.putIfAbsent(k, v)); } else { - if (this.outputMessageHeaderEnricher != null) { - result = this.outputMessageHeaderEnricher.apply((Message) input, result); + if (functionInvocationHelper != null) { + result = functionInvocationHelper.postProcessResult((Message) input, result); } else { result = MessageBuilder.withPayload(result).copyHeaders(this.sanitizeHeaders(((Message) input).getHeaders())).build(); @@ -823,7 +829,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect return null; } - input = CloudEventMessageUtils.toBinary((Message) input, messageConverter); + if (functionInvocationHelper != null) { + input = functionInvocationHelper.preProcessInput((Message) input, messageConverter); + } convertedInput = this.convertInputMessageIfNecessary((Message) input, type); if (convertedInput == null) { // give ConversionService a chance @@ -910,7 +918,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect * case that requires it since it may contain forwarding url */ private boolean containsRetainMessageSignalInHeaders(Message message) { - if (new CloudEventAttributes(message.getHeaders()).isValidCloudEvent()) { + if (functionInvocationHelper != null && functionInvocationHelper.isRetainOuputAsMessage(message)) { return true; } else { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index f03158e50..161d6cae3 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -34,6 +34,7 @@ import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionRegistry; import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry; +import org.springframework.cloud.function.core.FunctionInvocationHelper; import org.springframework.cloud.function.json.GsonMapper; import org.springframework.cloud.function.json.JacksonMapper; import org.springframework.cloud.function.json.JsonMapper; @@ -47,6 +48,8 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.FilterType; import org.springframework.core.convert.converter.GenericConverter; import org.springframework.core.convert.support.ConfigurableConversionService; +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; import org.springframework.messaging.converter.ByteArrayMessageConverter; import org.springframework.messaging.converter.CompositeMessageConverter; import org.springframework.messaging.converter.MessageConverter; @@ -71,7 +74,8 @@ public class ContextFunctionCatalogAutoConfiguration { static final String PREFERRED_MAPPER_PROPERTY = "spring.http.converters.preferred-json-mapper"; @Bean - public FunctionRegistry functionCatalog(List messageConverters, JsonMapper jsonMapper, ConfigurableApplicationContext context) { + public FunctionRegistry functionCatalog(List messageConverters, JsonMapper jsonMapper, + ConfigurableApplicationContext context, @Nullable FunctionInvocationHelper> functionInvocationHelper) { ConfigurableConversionService conversionService = (ConfigurableConversionService) context.getBeanFactory().getConversionService(); Map converters = context.getBeansOfType(GenericConverter.class); for (GenericConverter converter : converters.values()) { @@ -105,7 +109,7 @@ public class ContextFunctionCatalogAutoConfiguration { messageConverter = new SmartCompositeMessageConverter(mcList); } - return new BeanFactoryAwareFunctionRegistry(conversionService, messageConverter, jsonMapper); + return new BeanFactoryAwareFunctionRegistry(conversionService, messageConverter, jsonMapper, functionInvocationHelper); } @Bean(RoutingFunction.FUNCTION_NAME) diff --git a/spring-cloud-function-context/src/main/resources/META-INF/spring.factories b/spring-cloud-function-context/src/main/resources/META-INF/spring.factories index 1dbbeee8d..fa3023fb4 100644 --- a/spring-cloud-function-context/src/main/resources/META-INF/spring.factories +++ b/spring-cloud-function-context/src/main/resources/META-INF/spring.factories @@ -1,6 +1,7 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ -org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration +org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration,\ +org.springframework.cloud.function.cloudevent.CloudEventsFunctionExtensionConfiguration org.springframework.cloud.function.context.WrapperDetector=\ org.springframework.cloud.function.context.config.FluxWrapperDetector org.springframework.context.ApplicationContextInitializer=\ -org.springframework.cloud.function.context.config.ContextFunctionCatalogInitializer +org.springframework.cloud.function.context.config.ContextFunctionCatalogInitializer \ No newline at end of file diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java index 96a5661bf..ff9bf4b0b 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java @@ -16,7 +16,9 @@ package org.springframework.cloud.function.cloudevent; +import java.net.URI; import java.text.SimpleDateFormat; +import java.util.UUID; import java.util.function.Function; import org.junit.jupiter.api.Test; @@ -29,7 +31,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.support.MessageBuilder; import static org.assertj.core.api.Assertions.assertThat; @@ -42,12 +43,50 @@ public class CloudEventFunctionTests { @SuppressWarnings("unchecked") @Test - public void testBinaryPojoToPojoDefaultOutputAttributeProvider() { + public void testBinaryPojoToPojoDefaultOutputHeaderProvider() { Function function = this.lookup("echo", TestConfiguration.class); - Message inputMessage = MessageBuilder.withPayload("{\"name\":\"Ricky\"}") - .copyHeaders(CloudEventMessageUtils.get("https://spring.io/", "org.springframework")).build(); - assertThat(CloudEventMessageUtils.isBinary(inputMessage.getHeaders())).isTrue(); + String id = UUID.randomUUID().toString(); + + Message inputMessage = CloudEventMessageBuilder + .withData("{\"name\":\"Ricky\"}") + .setId(id) + .setSource("https://spring.io/") + .setType("org.springframework") + .build(); + + assertThat(inputMessage.getHeaders().getId()).isEqualTo(UUID.fromString(id)); + assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isTrue(); + + Message resultMessage = (Message) function.apply(inputMessage); + + + /* + * Validates that although user only deals with POJO, the framework recognizes + * both on input and output that it is dealing with Cloud Event and generates + * appropriate headers/attributes + */ + assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue(); + assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(Person.class.getName()); + assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application")); + } + + // this kind of emulates that message came from Kafka + @SuppressWarnings("unchecked") + @Test + public void testBinaryPojoToPojoDefaultOutputHeaderProviderWithPrefix() { + Function function = this.lookup("echo", TestConfiguration.class); + + String id = UUID.randomUUID().toString(); + + Message inputMessage = CloudEventMessageBuilder + .withData("{\"name\":\"Ricky\"}") + .setHeader("ce_id", id) + .setHeader("ce_source", "https://spring.io/") + .setHeader("ce_type", "org.springframework") + .build(); + +// assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isTrue(); Message resultMessage = (Message) function.apply(inputMessage); @@ -56,10 +95,9 @@ public class CloudEventFunctionTests { * both on input and output that it is dealing with Cloud Event and generates * appropriate headers/attributes */ - CloudEventAttributes attributes = new CloudEventAttributes(resultMessage.getHeaders()); - assertThat(attributes.isValidCloudEvent()).isTrue(); - assertThat((String) attributes.getType()).isEqualTo(Person.class.getName()); - assertThat((String) attributes.getSource()).isEqualTo("http://spring.io/application-application"); + assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue(); + assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(Person.class.getName()); + assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application")); } @SuppressWarnings("unchecked") @@ -79,24 +117,25 @@ public class CloudEventFunctionTests { "}"; Function function = this.lookup("springRelease", TestConfiguration.class); - Message inputMessage = MessageBuilder.withPayload(payload) + Message inputMessage = CloudEventMessageBuilder + .withData(payload) .setHeader(MessageHeaders.CONTENT_TYPE, CloudEventMessageUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json") .build(); - assertThat(CloudEventMessageUtils.isBinary(inputMessage.getHeaders())).isFalse(); + + assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isFalse(); Message resultMessage = (Message) function.apply(inputMessage); assertThat(resultMessage.getPayload().getReleaseDate()) .isEqualTo(new SimpleDateFormat("dd-MM-yyyy").parse("01-10-2006")); assertThat(resultMessage.getPayload().getVersion()).isEqualTo("2.0"); - /* - * Validates that although user only deals with POJO, the framework recognizes - * both on input and output that it is dealing with Cloud Event and generates - * appropriate headers/attributes - */ - CloudEventAttributes attributes = new CloudEventAttributes(resultMessage.getHeaders()); - assertThat(attributes.isValidCloudEvent()).isTrue(); - assertThat((String) attributes.getType()).isEqualTo(SpringReleaseEvent.class.getName()); - assertThat((String) attributes.getSource()).isEqualTo("http://spring.io/application-application"); +// /* +// * Validates that although user only deals with POJO, the framework recognizes +// * both on input and output that it is dealing with Cloud Event and generates +// * appropriate headers/attributes +// */ + assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue(); + assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(SpringReleaseEvent.class.getName()); + assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application")); } @SuppressWarnings("unchecked") @@ -115,10 +154,11 @@ public class CloudEventFunctionTests { "}"; Function function = this.lookup("springRelease", TestConfiguration.class); - Message inputMessage = MessageBuilder.withPayload(payload) + Message inputMessage = CloudEventMessageBuilder + .withData(payload) .setHeader(MessageHeaders.CONTENT_TYPE, CloudEventMessageUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json") .build(); - assertThat(CloudEventMessageUtils.isBinary(inputMessage.getHeaders())).isFalse(); + assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isFalse(); Message resultMessage = (Message) function.apply(inputMessage); assertThat(resultMessage.getPayload().getReleaseDate()) @@ -129,10 +169,9 @@ public class CloudEventFunctionTests { * both on input and output that it is dealing with Cloud Event and generates * appropriate headers/attributes */ - CloudEventAttributes attributes = new CloudEventAttributes(resultMessage.getHeaders()); - assertThat(attributes.isValidCloudEvent()).isTrue(); - assertThat((String) attributes.getType()).isEqualTo(SpringReleaseEvent.class.getName()); - assertThat((String) attributes.getSource()).isEqualTo("http://spring.io/application-application"); + assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue(); + assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(SpringReleaseEvent.class.getName()); + assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application")); } private Function lookup(String functionDefinition, Class... configClass) { diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java deleted file mode 100644 index 67e2b5cf8..000000000 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Copyright 2019-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.cloudevent; - -import java.lang.reflect.Field; -import java.util.UUID; - -import org.junit.jupiter.api.Test; - -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.builder.SpringApplicationBuilder; -import org.springframework.cloud.function.context.FunctionCatalog; -import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry; -import org.springframework.cloud.function.context.config.SmartCompositeMessageConverter; -import org.springframework.context.ApplicationContext; -import org.springframework.context.annotation.Configuration; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.util.MimeTypeUtils; -import org.springframework.util.ReflectionUtils; - -import static org.assertj.core.api.Assertions.assertThat; - -/** - * - * @author Oleg Zhurakousky - * - */ -public class CloudEventTypeConversionTests { - @Test - public void testFromMessageBinaryPayloadMatchesType() { - SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributes ceAttributes = CloudEventMessageUtils - .get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework"); - ceAttributes.setDataContentType("text/plain"); - Message message = MessageBuilder.withPayload("Hello Ricky").copyHeaders(ceAttributes).build(); - - String converted = (String) messageConverter.fromMessage(message, String.class); - assertThat(converted).isEqualTo("Hello Ricky"); - } - - @Test - public void testFromMessageBinaryPayloadDoesNotMatchType() { - SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributes ceAttributes = CloudEventMessageUtils - .get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework"); - Message message = MessageBuilder.withPayload("Hello Ricky".getBytes()) - .copyHeaders(ceAttributes) - .setHeader(MessageHeaders.CONTENT_TYPE, - MimeTypeUtils.parseMimeType("application/cloudevents+json;charset=utf-8")) - .build(); - String converted = (String) messageConverter.fromMessage(message, String.class); - assertThat(converted).isEqualTo("Hello Ricky"); - } - - @Test // JsonMessageConverter does some special things between byte[] and String so this works - public void testFromMessageBinaryPayloadNoDataContentTypeToString() { - SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributes ceAttributes = CloudEventMessageUtils - .get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework"); - Message message = MessageBuilder.withPayload("Hello Ricky".getBytes()) - .copyHeaders(ceAttributes) - .setHeader(MessageHeaders.CONTENT_TYPE, - MimeTypeUtils.parseMimeType("application/cloudevents+json;charset=utf-8")) - .build(); - String converted = (String) messageConverter.fromMessage(message, String.class); - assertThat(converted).isEqualTo("Hello Ricky"); - } - - @Test // Unlike the previous test the type here is POJO so no special treatement - public void testFromMessageBinaryPayloadNoDataContentTypeToPOJO() { - SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributes ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework"); - Message message = MessageBuilder.withPayload("Hello Ricky".getBytes()) - .copyHeaders(ceAttributes) - .setHeader(MessageHeaders.CONTENT_TYPE, - MimeTypeUtils.parseMimeType("application/cloudevents+json;charset=utf-8")) - .build(); - String converted = (String) messageConverter.fromMessage(message, Person.class); - assertThat(converted).isNull(); - } - - @Test // will fall on default CT which is json - public void testFromMessageBinaryPayloadNoDataContentTypeToPOJOThatWorks() { - SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributes ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework"); - Message message = MessageBuilder.withPayload("{\"name\":\"Ricky\"}".getBytes()) - .copyHeaders(ceAttributes) - .setHeader(MessageHeaders.CONTENT_TYPE, - MimeTypeUtils.parseMimeType("application/cloudevents+json;charset=utf-8")) - .build(); - Person converted = (Person) messageConverter.fromMessage(message, Person.class); - assertThat(converted.getName()).isEqualTo("Ricky"); - } - - private SmartCompositeMessageConverter configure(Class... configClass) { - ApplicationContext context = new SpringApplicationBuilder(configClass).run( - "--logging.level.org.springframework.cloud.function=DEBUG", "--spring.main.lazy-initialization=true"); - FunctionCatalog catalog = context.getBean(FunctionCatalog.class); - Field f = ReflectionUtils.findField(BeanFactoryAwareFunctionRegistry.class, "messageConverter"); - f.setAccessible(true); - try { - SmartCompositeMessageConverter messageConverter = (SmartCompositeMessageConverter) f.get(catalog); - return messageConverter; - } - catch (Exception e) { - throw new IllegalStateException(e); - } - } - - @EnableAutoConfiguration - @Configuration - public static class DummyConfiguration { - } - - public static class Person { - private String name; - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - } -} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionInvocationHelper.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionInvocationHelper.java new file mode 100644 index 000000000..b0649912a --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionInvocationHelper.java @@ -0,0 +1,33 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.springframework.cloud.function.core; + + +/** + * + * @author Oleg Zhurakousky + * + */ +public interface FunctionInvocationHelper { + + boolean isRetainOuputAsMessage(I input); + + I preProcessInput(I input, Object inputConverter); + + I postProcessResult(I input, Object result); +} diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java index 2ac2565ca..ef44106a1 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java @@ -25,13 +25,13 @@ import java.util.function.Function; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.client.RestTemplateBuilder; -import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider; +import org.springframework.cloud.function.cloudevent.CloudEventHeaderEnricher; +import org.springframework.cloud.function.cloudevent.CloudEventMessageBuilder; import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; import org.springframework.cloud.function.web.util.HeaderUtils; import org.springframework.context.annotation.Bean; import org.springframework.http.RequestEntity; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; @@ -104,9 +104,9 @@ public class CloudeventDemoApplication { } @Bean - public CloudEventAttributesProvider cloudEventAttributesProvider() { - return attributes -> { - attributes.setSource("https://interface21.com/").setType("com.interface21"); + public CloudEventHeaderEnricher cloudEventHeaderEnricher() { + return headers -> { + return headers.setSource("https://interface21.com/").setType("com.interface21"); }; } @@ -129,11 +129,11 @@ public class CloudeventDemoApplication { } @Bean - public Consumer> pojoConsumer(CloudEventAttributesProvider provider, RestTemplateBuilder builder) { + public Consumer> pojoConsumer(CloudEventHeaderEnricher enricher, RestTemplateBuilder builder) { return eventMessage -> { + Message newMessage = enricher.enrich(CloudEventMessageBuilder.fromMessage(eventMessage)).build(CloudEventMessageUtils.HTTP_ATTR_PREFIX); RequestEntity entity = RequestEntity.post(URI.create("http://foo.com")) - .headers(HeaderUtils.fromMessage( - new MessageHeaders(CloudEventMessageUtils.generateAttributes(eventMessage, provider)))) + .headers(HeaderUtils.fromMessage(newMessage.getHeaders())) .body(eventMessage.getPayload()); List sourceHeader = entity.getHeaders().get("ce-source"); Assert.isTrue(sourceHeader.get(0).equals("https://interface21.com/"), "'source' must be https://interface21.com/"); diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationFunctionTests.java b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationFunctionTests.java index acefaf273..e36788d10 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationFunctionTests.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationFunctionTests.java @@ -21,6 +21,7 @@ import java.util.function.Function; import org.junit.jupiter.api.Test; import org.springframework.boot.WebApplicationType; import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.function.cloudevent.CloudEventMessageBuilder; import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.context.ConfigurableApplicationContext; @@ -40,9 +41,11 @@ public class CloudeventDemoApplicationFunctionTests { try(ConfigurableApplicationContext context = new SpringApplicationBuilder(CloudeventDemoApplication.class) .web(WebApplicationType.NONE).run()) { FunctionCatalog catalog = context.getBean(FunctionCatalog.class); - Message binaryCloudEventMessage = MessageBuilder - .withPayload("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}") - .copyHeaders(CloudEventMessageUtils.get("spring.io/spring-event", "com.example.springevent")) + + Message inputMessage = CloudEventMessageBuilder + .withData("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}") + .setSource("https://spring.io/spring-event") + .setType("com.example.springevent") .build(); /* @@ -51,16 +54,16 @@ public class CloudeventDemoApplicationFunctionTests { * inside spring-cloud-function. */ Function, Message> asPojoMessage = catalog.lookup("asPOJOMessage"); - System.out.println(asPojoMessage.apply(binaryCloudEventMessage)); + System.out.println(asPojoMessage.apply(inputMessage)); Function, Message> asPojo = catalog.lookup("asPOJO"); - System.out.println(asPojo.apply(binaryCloudEventMessage)); + System.out.println(asPojo.apply(inputMessage)); Function, Message> asString = catalog.lookup("asString"); - System.out.println(asString.apply(binaryCloudEventMessage)); + System.out.println(asString.apply(inputMessage)); Function, Message> asStringMessage = catalog.lookup("asStringMessage"); - System.out.println(asStringMessage.apply(binaryCloudEventMessage)); + System.out.println(asStringMessage.apply(inputMessage)); } } @@ -69,9 +72,11 @@ public class CloudeventDemoApplicationFunctionTests { try(ConfigurableApplicationContext context = new SpringApplicationBuilder(CloudeventDemoApplication.class) .web(WebApplicationType.NONE).run()) { FunctionCatalog catalog = context.getBean(FunctionCatalog.class); - Message binaryCloudEventMessage = MessageBuilder - .withPayload("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}") - .copyHeaders(CloudEventMessageUtils.get("spring.io/spring-event", "com.example.springevent")) + + Message inputMessage = CloudEventMessageBuilder + .withData("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}") + .setSource("https://spring.io/spring-event") + .setType("com.example.springevent") .build(); /* @@ -80,7 +85,7 @@ public class CloudeventDemoApplicationFunctionTests { * inside spring-cloud-function. */ Function, Message> asPojoMessage = catalog.lookup("consumeAndProduceCloudEvent"); - System.out.println(asPojoMessage.apply(binaryCloudEventMessage)); + System.out.println(asPojoMessage.apply(inputMessage)); } } @@ -89,9 +94,11 @@ public class CloudeventDemoApplicationFunctionTests { try(ConfigurableApplicationContext context = new SpringApplicationBuilder(CloudeventDemoApplication.class) .web(WebApplicationType.NONE).run()) { FunctionCatalog catalog = context.getBean(FunctionCatalog.class); - Message binaryCloudEventMessage = MessageBuilder - .withPayload("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}") - .copyHeaders(CloudEventMessageUtils.get("spring.io/spring-event", "com.example.springevent")) + + Message inputMessage = CloudEventMessageBuilder + .withData("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}") + .setSource("https://spring.io/spring-event") + .setType("com.example.springevent") .build(); /* @@ -100,7 +107,7 @@ public class CloudeventDemoApplicationFunctionTests { * inside spring-cloud-function. */ Function, Message> asPojoMessage = catalog.lookup("consumeAndProduceCloudEventAsPojoToPojo"); - System.out.println(asPojoMessage.apply(binaryCloudEventMessage)); + System.out.println(asPojoMessage.apply(inputMessage)); } } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java index d5165e013..493e08c8e 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java @@ -33,7 +33,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.beans.factory.ObjectProvider; -import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; import org.springframework.cloud.function.context.message.MessageUtils; @@ -218,10 +217,10 @@ public class RequestProcessor { } private boolean isValidCloudEvent(Set headerKeys) { - return headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID) - && headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE) - && headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE) - && headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION); + return headerKeys.contains("ce-id") + && headerKeys.contains("ce-source") + && headerKeys.contains("ce-type") + && headerKeys.contains("ce-specversion"); } // this seem to be very relevant to AWS container tests