Restructure Cloud Events support to optionally support Cloud Events SDK

This commit is contained in:
Oleg Zhurakousky
2020-11-30 15:54:50 +01:00
parent 70fbcec586
commit 306da4248a
17 changed files with 751 additions and 703 deletions

View File

@@ -1,187 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.cloudevent;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.StringUtils;
/**
* Utility class to assist with accessing and setting Cloud Events attributes from {@link MessageHeaders}.
* <br><br>
* It is effectively a wrapper over {@link MessageHeaders} which is a {@link Map}.
* It also provides best effort to both discover the actual attribute name (regardless of the prefix)
* as well as set appropriate attribute name.
* <br><br>
* For example, If there is an attribute `ce-source` or `ce_source` or 'source`, by simply calling getSource()
* we'll discover it and will return its value.
* <br>
* Similar effort will happen during the setting of the attribute. If you provide {@link #prefixToUse} we will
* use it otherwise we'll attempt to determine based on current execution context which prefix to use.
*
* @author Oleg Zhurakousky
* @author Dave Syer
*
* @since 3.1
*/
public class CloudEventAttributes extends HashMap<String, Object> {
/**
*
*/
private static final long serialVersionUID = 5393610770855366497L;
private final String prefixToUse;
public CloudEventAttributes(Map<String, Object> headers, String prefixToUse) {
super(headers);
this.prefixToUse = prefixToUse;
}
public CloudEventAttributes(Map<String, Object> headers) {
this(headers, null);
}
public CloudEventAttributes setId(String id) {
this.setAttribute(CloudEventMessageUtils.ID, id);
return this;
}
public <A> A getId() {
A id = this.getAttribute(CloudEventMessageUtils.ID);
if (id instanceof UUID) {
id = null;
}
return id;
}
public CloudEventAttributes setSource(String source) {
this.setAttribute(CloudEventMessageUtils.SOURCE, source);
return this;
}
public <A> A getSource() {
return this.getAttribute(CloudEventMessageUtils.SOURCE);
}
public CloudEventAttributes setSpecversion(String specversion) {
this.setAttribute(CloudEventMessageUtils.SPECVERSION, specversion);
return this;
}
public <A> A getSpecversion() {
return this.getAttribute(CloudEventMessageUtils.SPECVERSION);
}
public CloudEventAttributes setType(String type) {
this.setAttribute(CloudEventMessageUtils.TYPE, type);
return this;
}
public <A> A getType() {
return this.getAttribute(CloudEventMessageUtils.TYPE);
}
public CloudEventAttributes setDataContentType(String datacontenttype) {
this.setAttribute(CloudEventMessageUtils.DATACONTENTTYPE, datacontenttype);
return this;
}
public <A> A getDataContentType() {
return this.getAttribute(CloudEventMessageUtils.DATACONTENTTYPE);
}
public CloudEventAttributes setDataSchema(String dataschema) {
this.setAttribute(CloudEventMessageUtils.DATASCHEMA, dataschema);
return this;
}
public <A> A getDataSchema() {
return this.getAttribute(CloudEventMessageUtils.DATASCHEMA);
}
public CloudEventAttributes setSubject(String subject) {
this.setAttribute(CloudEventMessageUtils.SUBJECT, subject);
return this;
}
public <A> A getSubect() {
return this.getAttribute(CloudEventMessageUtils.SUBJECT);
}
public CloudEventAttributes setTime(String time) {
this.setAttribute(CloudEventMessageUtils.TIME, time);
return this;
}
public <A> A getTime() {
return this.getAttribute(CloudEventMessageUtils.TIME);
}
/**
* Will delegate to the underlying {@link Map} returning the value for the requested attribute or null.
*/
@SuppressWarnings("unchecked")
public <A> A getAttribute(String attrName) {
if (this.containsKey(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attrName)) {
return (A) this.get(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attrName);
}
else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attrName)) {
return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attrName);
}
return (A) this.get(attrName);
}
/**
* Determines if this instance of {@link CloudEventAttributes} represents valid Cloud Event.
* This implies that it contains all 4 required attributes (id, source, type & specversion)
*
* @return true if this instance represents a valid Cloud Event
*/
public boolean isValidCloudEvent() {
return StringUtils.hasText(this.getId())
&& StringUtils.hasText(this.getSource())
&& StringUtils.hasText(this.getSpecversion())
&& StringUtils.hasText(this.getType());
}
String getAttributeName(String attributeName) {
if (this.containsKey(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attributeName)) {
return CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attributeName;
}
else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attributeName)) {
return CloudEventMessageUtils.HTTP_ATTR_PREFIX + attributeName;
}
return attributeName;
}
private CloudEventAttributes setAttribute(String attrName, String attrValue) {
if (StringUtils.hasText(this.prefixToUse)) {
this.remove(this.getAttributeName(attrName));
this.put(this.prefixToUse + attrName, attrValue);
}
else {
this.put(this.getAttributeName(attrName), attrValue);
}
return this;
}
}

View File

@@ -1,48 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.cloudevent;
/**
* Strategy that should be implemented by the user to help with outgoing Cloud Event attributes.
* <br><br>
* The provided `attributes` are already initialized with default values, so you can only set the ones that you need.
* <br>
* Once implemented, simply configure it as a bean and the framework will invoke it before the outbound Cloud Event Message is finalized.
*
* <pre>{@code
* @Bean
* public CloudEventAttributesProvider cloudEventAttributesProvider() {
* return attributes -> {
* attributes.setSource("https://interface21.com/").setType("com.interface21");
* };
* }}
* </pre>
*
* @author Oleg Zhurakousky
* @author Dave Syer
*
* @since 3.1
*/
@FunctionalInterface
public interface CloudEventAttributesProvider {
/**
*
* @param attributes instance of {@link CloudEventAttributes}
*/
void generateDefaultCloudEventHeaders(CloudEventAttributes attributes);
}

View File

@@ -0,0 +1,54 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.cloudevent;
/**
* Strategy that should be implemented by the user to help with outgoing Cloud Event
* headers. <br>
* <br>
* NOTE: The provided instance of {@link CloudEventMessageBuilder} may or may not be initialized
* with default values, so it is the responsibility of the user to ensure that all required Cloud Events
* attributes are set. That said, Spring frameworks which utilize this interface
* will ensure that the provided {@link CloudEventMessageBuilder} is initialized with default values, leaving
* you responsible to only set the attributes you need. <br>
* Once implemented, simply configure it as a bean and the framework will invoke it before
* the outbound Cloud Event Message is finalized.
*
* <pre>
* &#64;Bean
* public CloudEventHeadersProvider cloudEventHeadersProvider() {
* return attributes -&gt;
* CloudEventHeaderUtils.fromAttributes(attributes).withSource("https://interface21.com/").withType("com.interface21").build();
* }
* </pre>
*
* @author Oleg Zhurakousky
* @author Dave Syer
* @since 2.0
*/
@FunctionalInterface
public interface CloudEventHeaderEnricher {
/**
* @param attributes instance of {@link CloudEventContext}
* @return modified {@link CloudEventContext}
*/
CloudEventMessageBuilder<?> enrich(CloudEventMessageBuilder<?> messageBuilder);
}

View File

@@ -0,0 +1,196 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.cloudevent;
import java.net.URI;
import java.time.OffsetTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
/**
* Message builder which is aware of Cloud Event semantics.
*
* @author Oleg Zhurakousky
* @since 3.1
*/
public final class CloudEventMessageBuilder<T> {
protected Log logger = LogFactory.getLog(this.getClass());
private final Map<String, Object> headers;
private T data;
private CloudEventMessageBuilder(Map<String, Object> headers) {
this.headers = headers == null ? new HashMap<>() : headers;
}
public static <T> CloudEventMessageBuilder<T> withData(T data) {
CloudEventMessageBuilder<T> builder = new CloudEventMessageBuilder<T>(null);
builder.data = data;
return builder;
}
@SuppressWarnings("unchecked")
public static <T> CloudEventMessageBuilder<T> fromMessage(Message<?> message) {
CloudEventMessageBuilder<T> builder = new CloudEventMessageBuilder<T>(new HashMap<>(message.getHeaders()));
builder.data = (T) message.getPayload();
return builder;
}
public CloudEventMessageBuilder<T> setId(String id) {
this.headers.put(CloudEventMessageUtils.ID, id);
return this;
}
public CloudEventMessageBuilder<T> setSource(URI uri) {
this.headers.put(CloudEventMessageUtils.SOURCE, uri);
return this;
}
public CloudEventMessageBuilder<T> setSource(String uri) {
this.headers.put(CloudEventMessageUtils.SOURCE, URI.create(uri));
return this;
}
public CloudEventMessageBuilder<T> setSpecVersion(String specversion) {
this.headers.put(CloudEventMessageUtils.SPECVERSION, specversion);
return this;
}
public CloudEventMessageBuilder<T> setType(String type) {
this.headers.put(CloudEventMessageUtils.TYPE, type);
return this;
}
public CloudEventMessageBuilder<T> setDataContentType(String dataContentType) {
this.headers.put(CloudEventMessageUtils.DATACONTENTTYPE, dataContentType);
return this;
}
public CloudEventMessageBuilder<T> setDataSchema(URI dataSchema) {
this.headers.put(CloudEventMessageUtils.DATASCHEMA, dataSchema);
return this;
}
public CloudEventMessageBuilder<T> setDataSchema(String dataSchema) {
this.headers.put(CloudEventMessageUtils.DATASCHEMA, URI.create(dataSchema));
return this;
}
public CloudEventMessageBuilder<T> setSubject(String subject) {
this.headers.put(CloudEventMessageUtils.SUBJECT, subject);
return this;
}
public CloudEventMessageBuilder<T> copyHeaders(Map<String, Object> headers) {
this.headers.putAll(headers);
return this;
}
public CloudEventMessageBuilder<T> setTime(OffsetTime time) {
this.headers.put(CloudEventMessageUtils.TIME, time);
return this;
}
public CloudEventMessageBuilder<T> setTime(String time) {
this.headers.put(CloudEventMessageUtils.TIME, OffsetTime.parse(time));
return this;
}
public CloudEventMessageBuilder<T> setHeader(String key, Object value) {
this.headers.put(key, value);
return this;
}
/**
* Returns a snapshot of the headers {@link Map} at the time this method is called.
* The returned Map is read-only.
*
* @return map of headers
*/
public Map<String, Object> toHeadersMap() {
return Collections.unmodifiableMap(this.headers);
}
public Message<T> build() {
if (!this.headers.containsKey(CloudEventMessageUtils.SPECVERSION)) {
this.headers.put(CloudEventMessageUtils.SPECVERSION, "1.0");
}
return this.doBuild();
}
public Message<T> build(String attributePrefixToUse) {
String[] keys = this.headers.keySet().toArray(new String[] {});
for (String key : keys) {
Object value = this.headers.remove(key);
this.headers.put(attributePrefixToUse + key, value);
}
if (!this.headers.containsKey(attributePrefixToUse + CloudEventMessageUtils.SPECVERSION)) {
this.headers.put(attributePrefixToUse + CloudEventMessageUtils.SPECVERSION, "1.0");
}
return build();
}
private Message<T> doBuild() {
this.headers.put("message-type", "cloudevent");
CloudEventMessageHeaders headers = new CloudEventMessageHeaders(this.headers, this.getUUID(), null);
GenericMessage<T> message = new GenericMessage<T>(data, headers);
return message;
}
private UUID getUUID() {
UUID id = null;
if (this.headers.containsKey(CloudEventMessageUtils.ID)) {
String stringId = this.headers.get(CloudEventMessageUtils.ID).toString();
try {
id = UUID.fromString(stringId);
System.out.println(stringId);
System.out.println(id.toString());
}
catch (Exception e) {
logger.info("Provided Cloud Event 'id' is not compatible with Message 'id' which is UUID, "
+ "therefore Cloud Event 'id' will be written as '_id' message header");
this.headers.put("_" + CloudEventMessageUtils.ID, stringId);
this.headers.remove(CloudEventMessageUtils.ID);
}
}
return id;
}
private static class CloudEventMessageHeaders extends MessageHeaders {
/**
*
*/
private static final long serialVersionUID = -6424866731588545945L;
protected CloudEventMessageHeaders(Map<String, Object> headers, UUID id, Long timestamp) {
super(headers, id, timestamp);
}
}
}

View File

@@ -16,10 +16,12 @@
package org.springframework.cloud.function.cloudevent;
import java.util.HashMap;
import java.lang.reflect.Field;
import java.net.URI;
import java.time.OffsetTime;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
@@ -27,19 +29,19 @@ import org.springframework.messaging.converter.ContentTypeResolver;
import org.springframework.messaging.converter.DefaultContentTypeResolver;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
/**
* Miscellaneous utility methods to deal with Cloud Events - https://cloudevents.io/.
* <br>
* Primarily intended for the internal use within the framework;
* Miscellaneous utility methods to assist with representing Cloud Event as Spring
* {@link Message}. <br>
* Primarily intended for the internal use within Spring-based frameworks and
* integrations.
*
* @author Oleg Zhurakousky
* @author Dave Syer
*
* @since 3.1
*/
public final class CloudEventMessageUtils {
@@ -61,280 +63,254 @@ public final class CloudEventMessageUtils {
public static MimeType APPLICATION_CLOUDEVENTS = MimeTypeUtils.parseMimeType(APPLICATION_CLOUDEVENTS_VALUE);
/**
* Default attributes prefix which also suits Kafka.
* Prefix for attributes.
*/
public static String DEFAULT_ATTR_PREFIX = "ce_";
/**
* HTTP attributes prefix.
*/
public static String HTTP_ATTR_PREFIX = "ce-";
/**
* AMQP attributes prefix.
*/
public static String AMQP_ATTR_PREFIX = "cloudEvents:";
/**
* Prefix for attributes.
*/
public static String HTTP_ATTR_PREFIX = "ce-";
/**
* Value for 'data' attribute.
*/
public static String DATA = "data";
/**
* Value for 'data' attribute with prefix.
*/
public static String CANONICAL_DATA = DEFAULT_ATTR_PREFIX + DATA;
/**
* Value for 'id' attribute.
*/
public static String ID = "id";
/**
* Value for 'id' attribute with prefix.
*/
public static String CANONICAL_ID = DEFAULT_ATTR_PREFIX + ID;
/**
* Value for 'source' attribute.
*/
public static String SOURCE = "source";
/**
* Value for 'source' attribute with prefix.
*/
public static String CANONICAL_SOURCE = DEFAULT_ATTR_PREFIX + SOURCE;
/**
* Value for 'specversion' attribute.
*/
public static String SPECVERSION = "specversion";
/**
* Value for 'specversion' attribute with prefix.
*/
public static String CANONICAL_SPECVERSION = DEFAULT_ATTR_PREFIX + SPECVERSION;
/**
* Value for 'type' attribute.
*/
public static String TYPE = "type";
/**
* Value for 'type' attribute with prefix.
*/
public static String CANONICAL_TYPE = DEFAULT_ATTR_PREFIX + TYPE;
/**
* Value for 'datacontenttype' attribute.
*/
public static String DATACONTENTTYPE = "datacontenttype";
/**
* Value for 'datacontenttype' attribute with prefix.
*/
public static String CANONICAL_DATACONTENTTYPE = DEFAULT_ATTR_PREFIX + DATACONTENTTYPE;
/**
* Value for 'dataschema' attribute.
*/
public static String DATASCHEMA = "dataschema";
/**
* Value for 'dataschema' attribute with prefix.
* V03 name for 'dataschema' attribute.
*/
public static String CANONICAL_DATASCHEMA = DEFAULT_ATTR_PREFIX + DATASCHEMA;
public static final String SCHEMAURL = "schemaurl";
/**
* Value for 'subject' attribute.
*/
public static String SUBJECT = "subject";
/**
* Value for 'subject' attribute with prefix.
*/
public static String CANONICAL_SUBJECT = DEFAULT_ATTR_PREFIX + SUBJECT;
/**
* Value for 'time' attribute.
*/
public static String TIME = "time";
/**
* Value for 'time' attribute with prefix.
*/
public static String CANONICAL_TIME = DEFAULT_ATTR_PREFIX + TIME;
/**
* Checks if {@link Message} represents cloud event in binary-mode.
*/
public static boolean isBinary(Map<String, Object> headers) {
CloudEventAttributes attributes = new CloudEventAttributes(headers);
return attributes.isValidCloudEvent();
public static String getId(Message<?> message) {
if (message.getHeaders().containsKey("_id")) {
return (String) message.getHeaders().get("_id");
}
String prefix = determinePrefixToUse(message.getHeaders());
return (String) message.getHeaders().get(prefix + MessageHeaders.ID);
}
/**
* Will construct instance of {@link CloudEventAttributes} setting its required attributes.
*
* @param ce_id value for Cloud Event 'id' attribute
* @param ce_specversion value for Cloud Event 'specversion' attribute
* @param ce_source value for Cloud Event 'source' attribute
* @param ce_type value for Cloud Event 'type' attribute
* @return instance of {@link CloudEventAttributes}
*/
public static CloudEventAttributes get(String ce_id, String ce_specversion, String ce_source, String ce_type) {
Assert.hasText(ce_id, "'ce_id' must not be null or empty");
Assert.hasText(ce_specversion, "'ce_specversion' must not be null or empty");
Assert.hasText(ce_source, "'ce_source' must not be null or empty");
Assert.hasText(ce_type, "'ce_type' must not be null or empty");
Map<String, Object> requiredAttributes = new HashMap<>();
requiredAttributes.put(CloudEventMessageUtils.CANONICAL_ID, ce_id);
requiredAttributes.put(CloudEventMessageUtils.CANONICAL_SPECVERSION, ce_specversion);
requiredAttributes.put(CloudEventMessageUtils.CANONICAL_SOURCE, ce_source);
requiredAttributes.put(CloudEventMessageUtils.CANONICAL_TYPE, ce_type);
return new CloudEventAttributes(requiredAttributes);
public static URI getSource(Message<?> message) {
String prefix = determinePrefixToUse(message.getHeaders());
return safeGetURI(message.getHeaders(), prefix + SOURCE);
}
/**
* Will construct instance of {@link CloudEventAttributes}
* Should default/generate cloud event ID and SPECVERSION.
*
* @param ce_source value for Cloud Event 'source' attribute
* @param ce_type value for Cloud Event 'type' attribute
* @return instance of {@link CloudEventAttributes}
*/
public static CloudEventAttributes get(String ce_source, String ce_type) {
return get(UUID.randomUUID().toString(), "1.0", ce_source, ce_type);
public static String getSpecVersion(Message<?> message) {
String prefix = determinePrefixToUse(message.getHeaders());
return (String) message.getHeaders().get(prefix + SPECVERSION);
}
public static String getType(Message<?> message) {
String prefix = determinePrefixToUse(message.getHeaders());
return (String) message.getHeaders().get(prefix + TYPE);
}
public static String getDataContentType(Message<?> message) {
String prefix = determinePrefixToUse(message.getHeaders());
return (String) message.getHeaders().get(prefix + DATACONTENTTYPE);
}
public static URI getDataSchema(Message<?> message) {
String prefix = determinePrefixToUse(message.getHeaders());
return safeGetURI(message.getHeaders(), prefix + DATASCHEMA);
}
public static String getSubject(Message<?> message) {
String prefix = determinePrefixToUse(message.getHeaders());
return (String) message.getHeaders().get(prefix + SUBJECT);
}
public static OffsetTime getTime(Message<?> message) {
String prefix = determinePrefixToUse(message.getHeaders());
return (OffsetTime) message.getHeaders().get(prefix + TIME);
}
/**
* Will attempt to convert 'inputMessage' to a binary-mode Cloud Event {@link Message}.
* This typically happens when 'inputMessage' represents Cloud Event in structured-mode.
* <br>
* In the event the message already represents Cloud Event in binary-mode, or this
* message does not represent Cloud Event at all, it will be returned unchanged.
*
* @param inputMessage instance of incoming {@link Message}
* @param messageConverter instance of {@link MessageConverter} to assist with type conversion.
* @return instance of {@link Message} representing Cloud Event in binary-mode or unchanged 'inputMessage'.
*/
@SuppressWarnings("unchecked")
public static Message<?> toBinary(Message<?> inputMessage, MessageConverter messageConverter) {
protected static Message<?> toCannonical(Message<?> inputMessage, MessageConverter messageConverter) {
Map<String, Object> headers = inputMessage.getHeaders();
CloudEventAttributes attributes = new CloudEventAttributes(headers);
Field headersField = ReflectionUtils.findField(MessageHeaders.class, "headers");
headersField.setAccessible(true);
Map<String, Object> headers = (Map<String, Object>) ReflectionUtils.getField(headersField, inputMessage.getHeaders());
canonicalizeHeaders(headers);
String inputContentType = (String) inputMessage.getHeaders().get(DATACONTENTTYPE);
// first check the obvious and see if content-type is `cloudevents`
if (!attributes.isValidCloudEvent() && headers.containsKey(MessageHeaders.CONTENT_TYPE)) {
MimeType contentType = resolveContentType(inputMessage.getHeaders());
if (contentType != null && contentType.getType().equals(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getType())
&& contentType.getSubtype().startsWith(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getSubtype())) {
if (!isBinary(inputMessage) && headers.containsKey(MessageHeaders.CONTENT_TYPE)) {
MimeType contentType = contentTypeResolver.resolve(inputMessage.getHeaders());
if (contentType.getType().equals(APPLICATION_CLOUDEVENTS.getType()) && contentType
.getSubtype().startsWith(APPLICATION_CLOUDEVENTS.getSubtype())) {
String dataContentType = StringUtils.hasText(attributes.getDataContentType())
? attributes.getDataContentType()
String dataContentType = StringUtils.hasText(inputContentType) ? inputContentType
: MimeTypeUtils.APPLICATION_JSON_VALUE;
String suffix = contentType.getSubtypeSuffix();
Assert.hasText(suffix, "Content-type 'suffix' can not be determined from " + contentType);
MimeType cloudEventDeserializationContentType = MimeTypeUtils
.parseMimeType(contentType.getType() + "/" + suffix);
Message<?> cloudEventMessage = MessageBuilder.fromMessage(inputMessage)
.setHeader(MessageHeaders.CONTENT_TYPE, cloudEventDeserializationContentType)
.setHeader(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE, dataContentType)
.build();
Map<String, Object> structuredCloudEvent = (Map<String, Object>) messageConverter.fromMessage(cloudEventMessage, Map.class);
Message<?> binaryCeMessage = buildCeMessageFromStructured(structuredCloudEvent, inputMessage.getHeaders());
.setHeader(DATACONTENTTYPE, dataContentType).build();
Map<String, Object> structuredCloudEvent = (Map<String, Object>) messageConverter
.fromMessage(cloudEventMessage, Map.class);
canonicalizeHeaders(structuredCloudEvent);
Message<?> binaryCeMessage = buildBinaryMessageFromStructuredMap(structuredCloudEvent,
inputMessage.getHeaders());
return binaryCeMessage;
}
}
else if (StringUtils.hasText(attributes.getDataContentType())) {
return MessageBuilder.fromMessage(inputMessage)
.setHeader(MessageHeaders.CONTENT_TYPE, attributes.getDataContentType())
.build();
else if (StringUtils.hasText(inputContentType)) { // this needs thinking since . .
return MessageBuilder.fromMessage(inputMessage).setHeader(MessageHeaders.CONTENT_TYPE, inputContentType)
.build();
}
return inputMessage;
}
private static MimeType resolveContentType(MessageHeaders headers) {
try {
return contentTypeResolver.resolve(headers);
}
catch (Exception e) {
// ignore
}
return null;
}
/**
* Will attempt to determine based on the headers the origin of Message (e.g., HTTP, Kafka etc)
* and based on this designate prefix to be used for Cloud Events attributes (i.e., `ce-` or `ce_` etc).
* Determines attribute prefix based on the presence of certain well defined headers.
*
* @param messageHeaders instance of {@link MessageHeaders}
* @return prefix to be used for Cloud Events attributes
* TODO work in progress as it needs to be refined
*
* @param messageHeaders map of message headers
* @return prefix (e.g., 'ce_' or 'ce-' etc.)
*/
public static String determinePrefixToUse(MessageHeaders messageHeaders) {
protected static String determinePrefixToUse(Map<String, Object> messageHeaders) {
Set<String> keys = messageHeaders.keySet();
if (keys.contains("user-agent")) {
return CloudEventMessageUtils.HTTP_ATTR_PREFIX;
}
else if (keys.contains("amqp")) {
return CloudEventMessageUtils.AMQP_ATTR_PREFIX;
return HTTP_ATTR_PREFIX;
}
else {
return CloudEventMessageUtils.DEFAULT_ATTR_PREFIX; // default which also suits Kafka 'ce_'
for (String key : messageHeaders.keySet()) {
if (key.startsWith("kafka_")) {
return DEFAULT_ATTR_PREFIX;
}
else if (key.startsWith("amqp_")) {
return AMQP_ATTR_PREFIX;
}
else if (key.startsWith(DEFAULT_ATTR_PREFIX)) {
return DEFAULT_ATTR_PREFIX;
}
else if (key.startsWith(HTTP_ATTR_PREFIX)) {
return HTTP_ATTR_PREFIX;
}
else if (key.startsWith(AMQP_ATTR_PREFIX)) {
return AMQP_ATTR_PREFIX;
}
}
}
return "";
}
/**
* Typically called by Consumer.
* Will check for the existence of required attributes. Assumes attributes (headers)
* are in canonical form.
* @param message input {@link Message}
* @return true if this Message represents Cloud Event in binary-mode
*/
public static CloudEventAttributes generateAttributes(Message<?> message, CloudEventAttributesProvider provider) {
CloudEventAttributes attributes = generateDefaultAttributeValues(new CloudEventAttributes(message.getHeaders()),
message.getPayload().getClass().getName().getClass().getName(), message.getPayload().getClass().getName().getClass().getName());
provider.generateDefaultCloudEventHeaders(attributes);
return attributes;
protected static boolean isBinary(Message<?> message) {
return message.getHeaders().containsKey(SPECVERSION)
&& message.getHeaders().containsKey(TYPE)
&& message.getHeaders().containsKey(SOURCE);
}
public static CloudEventAttributes generateAttributes(Message<?> inputMessage, String typeName, String sourceName) {
CloudEventAttributes attributes = new CloudEventAttributes(inputMessage.getHeaders(), CloudEventMessageUtils.determinePrefixToUse(inputMessage.getHeaders()));
return generateDefaultAttributeValues(attributes, sourceName, typeName);
/**
* Will canonicalize Cloud Event attributes (headers) by removing well known prefixes.
* So, for example 'ce_source' will become 'source'.
* @param headers message headers
*/
private static void canonicalizeHeaders(Map<String, Object> headers) {
String[] keys = headers.keySet().toArray(new String[] {});
for (String key : keys) {
if (key.startsWith(HTTP_ATTR_PREFIX)) {
Object value = headers.remove(key);
key = key.substring(HTTP_ATTR_PREFIX.length());
headers.put(key, value);
}
else if (key.startsWith(DEFAULT_ATTR_PREFIX)) {
Object value = headers.remove(key);
key = key.substring(DEFAULT_ATTR_PREFIX.length());
headers.put(key, value);
}
else if (key.startsWith(AMQP_ATTR_PREFIX)) {
Object value = headers.remove(key);
key = key.substring(AMQP_ATTR_PREFIX.length());
headers.put(key, value);
}
}
}
private static Message<?> buildCeMessageFromStructured(Map<String, Object> structuredCloudEvent, MessageHeaders originalHeaders) {
String prefixToUse = determinePrefixToUse(originalHeaders);
Object data = null;
if (structuredCloudEvent.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA)) {
data = structuredCloudEvent.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA);
structuredCloudEvent.remove(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA);
private static Message<?> buildBinaryMessageFromStructuredMap(Map<String, Object> structuredCloudEvent,
MessageHeaders originalHeaders) {
Object payload = structuredCloudEvent.remove(DATA);
if (payload == null) {
payload = Collections.emptyMap();
}
else if (structuredCloudEvent.containsKey(CloudEventMessageUtils.CANONICAL_DATA)) {
data = structuredCloudEvent.get(CloudEventMessageUtils.CANONICAL_DATA);
structuredCloudEvent.remove(CloudEventMessageUtils.CANONICAL_DATA);
CloudEventMessageBuilder<?> messageBuilder = CloudEventMessageBuilder
.withData(payload)
.copyHeaders(structuredCloudEvent);
for (String key : originalHeaders.keySet()) {
if (!MessageHeaders.ID.equals(key)) {
messageBuilder.setHeader(key, originalHeaders.get(key));
}
}
else if (structuredCloudEvent.containsKey(CloudEventMessageUtils.DATA)) {
data = structuredCloudEvent.get(CloudEventMessageUtils.DATA);
structuredCloudEvent.remove(CloudEventMessageUtils.DATA);
}
Assert.notNull(data, "'data' must not be null");
MessageBuilder<?> builder = MessageBuilder.withPayload(data);
CloudEventAttributes attributes = new CloudEventAttributes(structuredCloudEvent);
builder.setHeader(prefixToUse + CloudEventMessageUtils.ID, attributes.getId());
builder.setHeader(prefixToUse + CloudEventMessageUtils.SOURCE, attributes.getSource());
builder.setHeader(prefixToUse + CloudEventMessageUtils.TYPE, attributes.getType());
builder.setHeader(prefixToUse + CloudEventMessageUtils.SPECVERSION, attributes.getSpecversion());
builder.copyHeaders(originalHeaders);
return builder.build();
return messageBuilder.build();
}
private static CloudEventAttributes generateDefaultAttributeValues(CloudEventAttributes attributes, String source, String type) {
if (attributes.isValidCloudEvent()) {
return attributes
.setSpecversion("1.0")
.setId(UUID.randomUUID().toString())
.setType(type)
.setSource(source);
private static URI safeGetURI(Map<String, Object> map, String key) {
Object uri = map.get(key);
if (uri != null && uri instanceof String) {
uri = URI.create((String) uri);
}
return attributes;
return (URI) uri;
}
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.cloudevent;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable;
/**
*
* @author Oleg Zhurakousky
* @since 3.1
*/
@Configuration(proxyBeanMethods = false)
class CloudEventsFunctionExtensionConfiguration {
@Bean
@ConditionalOnMissingClass("io.cloudevents.CloudEvent")
public CloudEventsFunctionInvocationHelper nativeFunctionInvocationHelper(@Nullable CloudEventHeaderEnricher cloudEventHeadersProvider) {
return new CloudEventsFunctionInvocationHelper(cloudEventHeadersProvider);
}
@Bean
@ConditionalOnClass(name = "io.cloudevents.CloudEvent")
public CloudEventsFunctionInvocationHelper sdkFunctionInvocationHelper() {
// TODO you may need SDKs header provider
return null;
}
}

View File

@@ -0,0 +1,101 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.cloudevent;
import java.net.URI;
import java.util.UUID;
import org.springframework.beans.BeansException;
import org.springframework.cloud.function.core.FunctionInvocationHelper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StringUtils;
/**
*
* @author Oleg Zhurakousky
* @since 2.0
*
*/
class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Message<?>>, ApplicationContextAware {
private ConfigurableApplicationContext applicationContext;
private final CloudEventHeaderEnricher cloudEventAttributesProvider;
CloudEventsFunctionInvocationHelper(@Nullable CloudEventHeaderEnricher cloudEventHeadersProvider) {
this.cloudEventAttributesProvider = cloudEventHeadersProvider;
}
@Override
public boolean isRetainOuputAsMessage(Message<?> message) {
if (message.getHeaders().containsKey("message-type") && message.getHeaders().get("message-type").equals("cloudevent")) {
return true;
}
return false;
}
@Override
public Message<?> preProcessInput(Message<?> input, Object inputConverter) {
return CloudEventMessageUtils.toCannonical(input, (MessageConverter) inputConverter);
}
@Override
public Message<?> postProcessResult(Message<?> input, Object result) {
Message<?> resultMessage = null;
if (CloudEventMessageUtils.isBinary(input)) {
CloudEventMessageBuilder<?> messageBuilder = CloudEventMessageBuilder
.withData(result)
.setId(UUID.randomUUID().toString())
.setSource(URI.create("http://spring.io/" + getApplicationName()))
.setType(result.getClass().getName());
if (this.cloudEventAttributesProvider != null) {
messageBuilder = this.cloudEventAttributesProvider.enrich(messageBuilder);
}
String prefix = this.determineOutputPrefix(input);
resultMessage = messageBuilder.build(prefix);
}
else {
resultMessage = MessageBuilder.withPayload(result).build();
}
return resultMessage;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
private String determineOutputPrefix(Message<?> input) {
//TODO rework to actually figure out where output goes instead of relying on input
return CloudEventMessageUtils.determinePrefixToUse(input.getHeaders());
}
private String getApplicationName() {
ConfigurableEnvironment environment = this.applicationContext.getEnvironment();
String name = environment.getProperty("spring.application.name");
return (StringUtils.hasText(name) ? name : "application-" + this.applicationContext.getId());
}
}

View File

@@ -20,7 +20,6 @@ import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -32,26 +31,22 @@ import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
import org.springframework.cloud.function.cloudevent.CloudEventAttributes;
import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider;
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.core.FunctionInvocationHelper;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StringUtils;
/**
* Implementation of {@link FunctionRegistry} capable of discovering functioins in
* {@link BeanFactory}.
* Implementation of {@link FunctionRegistry} capable of discovering functioins in {@link BeanFactory}.
*
* @author Oleg Zhurakousky
*/
@@ -59,19 +54,14 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
private GenericApplicationContext applicationContext;
private CloudEventAttributesProvider cloudEventAtttributesProvider;
public BeanFactoryAwareFunctionRegistry(ConversionService conversionService,
CompositeMessageConverter messageConverter, JsonMapper jsonMapper) {
super(conversionService, messageConverter, jsonMapper);
public BeanFactoryAwareFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter,
JsonMapper jsonMapper, @Nullable FunctionInvocationHelper<Message<?>> functionInvocationHelper) {
super(conversionService, messageConverter, jsonMapper, functionInvocationHelper);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (GenericApplicationContext) applicationContext;
if (applicationContext.getBeanNamesForType(CloudEventAttributesProvider.class).length > 0) {
this.cloudEventAtttributesProvider = applicationContext.getBean(CloudEventAttributesProvider.class);
}
}
/*
@@ -80,9 +70,10 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
*/
@Override
public int size() {
return this.applicationContext.getBeanNamesForType(Supplier.class).length
+ this.applicationContext.getBeanNamesForType(Function.class).length
+ this.applicationContext.getBeanNamesForType(Consumer.class).length + super.size();
return this.applicationContext.getBeanNamesForType(Supplier.class).length +
this.applicationContext.getBeanNamesForType(Function.class).length +
this.applicationContext.getBeanNamesForType(Consumer.class).length +
super.size();
}
/*
@@ -92,9 +83,12 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
public Set<String> getNames(Class<?> type) {
Set<String> registeredNames = super.getNames(type);
if (type == null) {
registeredNames.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Function.class)));
registeredNames.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Supplier.class)));
registeredNames.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Consumer.class)));
registeredNames
.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Function.class)));
registeredNames
.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Supplier.class)));
registeredNames
.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Consumer.class)));
}
else {
registeredNames.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(type)));
@@ -105,8 +99,9 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public <T> T lookup(Class<?> type, String functionDefinition, String... expectedOutputMimeTypes) {
functionDefinition = StringUtils.hasText(functionDefinition) ? functionDefinition
: this.applicationContext.getEnvironment().getProperty(FunctionProperties.FUNCTION_DEFINITION, "");
functionDefinition = StringUtils.hasText(functionDefinition)
? functionDefinition
: this.applicationContext.getEnvironment().getProperty(FunctionProperties.FUNCTION_DEFINITION, "");
functionDefinition = this.normalizeFunctionDefinition(functionDefinition);
if (!StringUtils.hasText(functionDefinition)) {
@@ -117,8 +112,7 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
if (function == null) {
Set<String> functionRegistratioinNames = super.getNames(null);
String[] functionNames = StringUtils
.delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), "|");
String[] functionNames = StringUtils.delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), "|");
for (String functionName : functionNames) {
if (functionRegistratioinNames.contains(functionName)) {
logger.info("Skipping function '" + functionName + "' since it is already present");
@@ -132,31 +126,26 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
functionRegistration = (FunctionRegistration) functionCandidate;
}
else if (this.isFunctionPojo(functionCandidate, functionName)) {
Method functionalMethod = FunctionTypeUtils
.discoverFunctionalMethod(functionCandidate.getClass());
Method functionalMethod = FunctionTypeUtils.discoverFunctionalMethod(functionCandidate.getClass());
functionCandidate = this.proxyTarget(functionCandidate, functionalMethod);
functionType = FunctionTypeUtils.fromFunctionMethod(functionalMethod);
}
else if (this.isSpecialFunctionRegistration(functionNames, functionName)) {
functionRegistration = this.applicationContext.getBean(
functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX,
FunctionRegistration.class);
functionRegistration = this.applicationContext
.getBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX, FunctionRegistration.class);
}
else {
functionType = FunctionTypeUtils.discoverFunctionType(functionCandidate, functionName,
this.applicationContext);
functionType = FunctionTypeUtils.discoverFunctionType(functionCandidate, functionName, this.applicationContext);
}
if (functionRegistration == null) {
functionRegistration = new FunctionRegistration(functionCandidate, functionName)
.type(functionType);
functionRegistration = new FunctionRegistration(functionCandidate, functionName).type(functionType);
}
this.register(functionRegistration);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Function '" + functionName
+ "' is not available in FunctionCatalog or BeanFactory");
logger.debug("Function '" + functionName + "' is not available in FunctionCatalog or BeanFactory");
}
}
}
@@ -164,36 +153,9 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
function = super.doLookup(type, functionDefinition, expectedOutputMimeTypes);
}
if (function != null) {
BiFunction<Message<?>, Object, Message<?>> invocationResultHeaderEnricher = new BiFunction<Message<?>, Object, Message<?>>() {
@Override
public Message<?> apply(Message<?> inputMessage, Object invocationResult) {
// TODO: Factor it out! Cloud Events specific code
CloudEventAttributes generatedCeHeaders = CloudEventMessageUtils.generateAttributes(inputMessage,
invocationResult.getClass().getName(), getApplicationName());
CloudEventAttributes attributes = new CloudEventAttributes(generatedCeHeaders,
CloudEventMessageUtils.determinePrefixToUse(inputMessage.getHeaders()));
if (cloudEventAtttributesProvider != null) {
cloudEventAtttributesProvider.generateDefaultCloudEventHeaders(attributes);
}
Message message = MessageBuilder.withPayload(invocationResult).copyHeaders(attributes).build();
return message;
}
};
function.setOutputMessageHeaderEnricher(invocationResultHeaderEnricher);
}
return (T) function;
}
private String getApplicationName() {
ConfigurableEnvironment environment = this.applicationContext.getEnvironment();
String name = environment.getProperty("spring.application.name");
return "http://spring.io/"
+ (StringUtils.hasText(name) ? name : "application-" + this.applicationContext.getId());
}
private Object discoverFunctionInBeanFactory(String functionName) {
Object functionCandidate = null;
if (this.applicationContext.containsBean(functionName)) {
@@ -201,8 +163,7 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
}
else {
try {
functionCandidate = BeanFactoryAnnotationUtils
.qualifiedBeanOfType(this.applicationContext.getBeanFactory(), Object.class, functionName);
functionCandidate = BeanFactoryAnnotationUtils.qualifiedBeanOfType(this.applicationContext.getBeanFactory(), Object.class, functionName);
}
catch (Exception e) {
// ignore since there is no safe isAvailable-kind of method
@@ -217,19 +178,21 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
}
private boolean isFunctionPojo(Object functionCandidate, String functionName) {
return !functionCandidate.getClass().isSynthetic() && !(functionCandidate instanceof Supplier)
&& !(functionCandidate instanceof Function) && !(functionCandidate instanceof Consumer)
&& !this.applicationContext.containsBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX);
return !functionCandidate.getClass().isSynthetic()
&& !(functionCandidate instanceof Supplier)
&& !(functionCandidate instanceof Function)
&& !(functionCandidate instanceof Consumer)
&& !this.applicationContext.containsBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX);
}
/**
* At the moment 'special function registration' simply implies that a bean under the
* provided functionName may have already been wrapped and registered as
* FunuctionRegistration with BeanFactory under the name of the function suffixed with
* {@link FunctionRegistration#REGISTRATION_NAME_SUFFIX} (e.g.,
* 'myKotlinFunction_registration'). <br>
* <br>
* At the moment 'special function registration' simply implies that a bean under the provided functionName
* may have already been wrapped and registered as FunuctionRegistration with BeanFactory under the name of
* the function suffixed with {@link FunctionRegistration#REGISTRATION_NAME_SUFFIX}
* (e.g., 'myKotlinFunction_registration').
* <br><br>
* At the moment only Kotlin module does this
*
* @param functionCandidate candidate for FunctionInvocationWrapper instance
* @param functionName the name of the function
* @return true if this function candidate qualifies
@@ -250,5 +213,4 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
});
return pf.getProxy();
}
}

View File

@@ -31,7 +31,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -46,13 +45,12 @@ import reactor.util.function.Tuples;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.function.cloudevent.CloudEventAttributes;
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.function.core.FunctionInvocationHelper;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.core.ResolvableType;
import org.springframework.core.convert.ConversionService;
@@ -98,10 +96,13 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
private final JsonMapper jsonMapper;
private final FunctionInvocationHelper<Message<?>> functionInvocationHelper;
@Autowired(required = false)
private FunctionAroundWrapper functionAroundWrapper;
public SimpleFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter, JsonMapper jsonMapper) {
public SimpleFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter, JsonMapper jsonMapper,
@Nullable FunctionInvocationHelper<Message<?>> functionInvocationHelper) {
Assert.notNull(messageConverter, "'messageConverter' must not be null");
Assert.notNull(jsonMapper, "'jsonMapper' must not be null");
this.conversionService = conversionService;
@@ -109,6 +110,11 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
this.messageConverter = messageConverter;
this.headersField = ReflectionUtils.findField(MessageHeaders.class, "headers");
this.headersField.setAccessible(true);
this.functionInvocationHelper = functionInvocationHelper;
}
public SimpleFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter, JsonMapper jsonMapper) {
this(conversionService, messageConverter, jsonMapper, null);
}
@SuppressWarnings("unchecked")
@@ -322,11 +328,11 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
*/
private Function<Object, Message> enhancer;
private BiFunction<Message<?>, Object, Message<?>> outputMessageHeaderEnricher;
void setOutputMessageHeaderEnricher(BiFunction<Message<?>, Object, Message<?>> outputMessageHeaderEnricher) {
this.outputMessageHeaderEnricher = outputMessageHeaderEnricher;
}
// private BiFunction<Message<?>, Object, Message<?>> outputMessageHeaderEnricher;
//
// void setOutputMessageHeaderEnricher(BiFunction<Message<?>, Object, Message<?>> outputMessageHeaderEnricher) {
// this.outputMessageHeaderEnricher = outputMessageHeaderEnricher;
// }
FunctionInvocationWrapper(FunctionInvocationWrapper function) {
this.target = function.target;
@@ -623,8 +629,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
this.sanitizeHeaders(((Message) input).getHeaders()).forEach((k, v) -> headersMap.putIfAbsent(k, v));
}
else {
if (this.outputMessageHeaderEnricher != null) {
result = this.outputMessageHeaderEnricher.apply((Message<?>) input, result);
if (functionInvocationHelper != null) {
result = functionInvocationHelper.postProcessResult((Message<?>) input, result);
}
else {
result = MessageBuilder.withPayload(result).copyHeaders(this.sanitizeHeaders(((Message) input).getHeaders())).build();
@@ -823,7 +829,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
return null;
}
input = CloudEventMessageUtils.toBinary((Message<?>) input, messageConverter);
if (functionInvocationHelper != null) {
input = functionInvocationHelper.preProcessInput((Message<?>) input, messageConverter);
}
convertedInput = this.convertInputMessageIfNecessary((Message) input, type);
if (convertedInput == null) { // give ConversionService a chance
@@ -910,7 +918,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
* case that requires it since it may contain forwarding url
*/
private boolean containsRetainMessageSignalInHeaders(Message message) {
if (new CloudEventAttributes(message.getHeaders()).isValidCloudEvent()) {
if (functionInvocationHelper != null && functionInvocationHelper.isRetainOuputAsMessage(message)) {
return true;
}
else {

View File

@@ -34,6 +34,7 @@ import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry;
import org.springframework.cloud.function.core.FunctionInvocationHelper;
import org.springframework.cloud.function.json.GsonMapper;
import org.springframework.cloud.function.json.JacksonMapper;
import org.springframework.cloud.function.json.JsonMapper;
@@ -47,6 +48,8 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.FilterType;
import org.springframework.core.convert.converter.GenericConverter;
import org.springframework.core.convert.support.ConfigurableConversionService;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.ByteArrayMessageConverter;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
@@ -71,7 +74,8 @@ public class ContextFunctionCatalogAutoConfiguration {
static final String PREFERRED_MAPPER_PROPERTY = "spring.http.converters.preferred-json-mapper";
@Bean
public FunctionRegistry functionCatalog(List<MessageConverter> messageConverters, JsonMapper jsonMapper, ConfigurableApplicationContext context) {
public FunctionRegistry functionCatalog(List<MessageConverter> messageConverters, JsonMapper jsonMapper,
ConfigurableApplicationContext context, @Nullable FunctionInvocationHelper<Message<?>> functionInvocationHelper) {
ConfigurableConversionService conversionService = (ConfigurableConversionService) context.getBeanFactory().getConversionService();
Map<String, GenericConverter> converters = context.getBeansOfType(GenericConverter.class);
for (GenericConverter converter : converters.values()) {
@@ -105,7 +109,7 @@ public class ContextFunctionCatalogAutoConfiguration {
messageConverter = new SmartCompositeMessageConverter(mcList);
}
return new BeanFactoryAwareFunctionRegistry(conversionService, messageConverter, jsonMapper);
return new BeanFactoryAwareFunctionRegistry(conversionService, messageConverter, jsonMapper, functionInvocationHelper);
}
@Bean(RoutingFunction.FUNCTION_NAME)

View File

@@ -1,6 +1,7 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration
org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration,\
org.springframework.cloud.function.cloudevent.CloudEventsFunctionExtensionConfiguration
org.springframework.cloud.function.context.WrapperDetector=\
org.springframework.cloud.function.context.config.FluxWrapperDetector
org.springframework.context.ApplicationContextInitializer=\
org.springframework.cloud.function.context.config.ContextFunctionCatalogInitializer
org.springframework.cloud.function.context.config.ContextFunctionCatalogInitializer

View File

@@ -16,7 +16,9 @@
package org.springframework.cloud.function.cloudevent;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.UUID;
import java.util.function.Function;
import org.junit.jupiter.api.Test;
@@ -29,7 +31,6 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
@@ -42,12 +43,50 @@ public class CloudEventFunctionTests {
@SuppressWarnings("unchecked")
@Test
public void testBinaryPojoToPojoDefaultOutputAttributeProvider() {
public void testBinaryPojoToPojoDefaultOutputHeaderProvider() {
Function<Object, Object> function = this.lookup("echo", TestConfiguration.class);
Message<String> inputMessage = MessageBuilder.withPayload("{\"name\":\"Ricky\"}")
.copyHeaders(CloudEventMessageUtils.get("https://spring.io/", "org.springframework")).build();
assertThat(CloudEventMessageUtils.isBinary(inputMessage.getHeaders())).isTrue();
String id = UUID.randomUUID().toString();
Message<String> inputMessage = CloudEventMessageBuilder
.withData("{\"name\":\"Ricky\"}")
.setId(id)
.setSource("https://spring.io/")
.setType("org.springframework")
.build();
assertThat(inputMessage.getHeaders().getId()).isEqualTo(UUID.fromString(id));
assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isTrue();
Message<Person> resultMessage = (Message<Person>) function.apply(inputMessage);
/*
* Validates that although user only deals with POJO, the framework recognizes
* both on input and output that it is dealing with Cloud Event and generates
* appropriate headers/attributes
*/
assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue();
assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(Person.class.getName());
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
}
// this kind of emulates that message came from Kafka
@SuppressWarnings("unchecked")
@Test
public void testBinaryPojoToPojoDefaultOutputHeaderProviderWithPrefix() {
Function<Object, Object> function = this.lookup("echo", TestConfiguration.class);
String id = UUID.randomUUID().toString();
Message<String> inputMessage = CloudEventMessageBuilder
.withData("{\"name\":\"Ricky\"}")
.setHeader("ce_id", id)
.setHeader("ce_source", "https://spring.io/")
.setHeader("ce_type", "org.springframework")
.build();
// assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isTrue();
Message<Person> resultMessage = (Message<Person>) function.apply(inputMessage);
@@ -56,10 +95,9 @@ public class CloudEventFunctionTests {
* both on input and output that it is dealing with Cloud Event and generates
* appropriate headers/attributes
*/
CloudEventAttributes attributes = new CloudEventAttributes(resultMessage.getHeaders());
assertThat(attributes.isValidCloudEvent()).isTrue();
assertThat((String) attributes.getType()).isEqualTo(Person.class.getName());
assertThat((String) attributes.getSource()).isEqualTo("http://spring.io/application-application");
assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue();
assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(Person.class.getName());
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
}
@SuppressWarnings("unchecked")
@@ -79,24 +117,25 @@ public class CloudEventFunctionTests {
"}";
Function<Object, Object> function = this.lookup("springRelease", TestConfiguration.class);
Message<String> inputMessage = MessageBuilder.withPayload(payload)
Message<String> inputMessage = CloudEventMessageBuilder
.withData(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, CloudEventMessageUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json")
.build();
assertThat(CloudEventMessageUtils.isBinary(inputMessage.getHeaders())).isFalse();
assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isFalse();
Message<SpringReleaseEvent> resultMessage = (Message<SpringReleaseEvent>) function.apply(inputMessage);
assertThat(resultMessage.getPayload().getReleaseDate())
.isEqualTo(new SimpleDateFormat("dd-MM-yyyy").parse("01-10-2006"));
assertThat(resultMessage.getPayload().getVersion()).isEqualTo("2.0");
/*
* Validates that although user only deals with POJO, the framework recognizes
* both on input and output that it is dealing with Cloud Event and generates
* appropriate headers/attributes
*/
CloudEventAttributes attributes = new CloudEventAttributes(resultMessage.getHeaders());
assertThat(attributes.isValidCloudEvent()).isTrue();
assertThat((String) attributes.getType()).isEqualTo(SpringReleaseEvent.class.getName());
assertThat((String) attributes.getSource()).isEqualTo("http://spring.io/application-application");
// /*
// * Validates that although user only deals with POJO, the framework recognizes
// * both on input and output that it is dealing with Cloud Event and generates
// * appropriate headers/attributes
// */
assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue();
assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(SpringReleaseEvent.class.getName());
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
}
@SuppressWarnings("unchecked")
@@ -115,10 +154,11 @@ public class CloudEventFunctionTests {
"}";
Function<Object, Object> function = this.lookup("springRelease", TestConfiguration.class);
Message<String> inputMessage = MessageBuilder.withPayload(payload)
Message<String> inputMessage = CloudEventMessageBuilder
.withData(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, CloudEventMessageUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json")
.build();
assertThat(CloudEventMessageUtils.isBinary(inputMessage.getHeaders())).isFalse();
assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isFalse();
Message<SpringReleaseEvent> resultMessage = (Message<SpringReleaseEvent>) function.apply(inputMessage);
assertThat(resultMessage.getPayload().getReleaseDate())
@@ -129,10 +169,9 @@ public class CloudEventFunctionTests {
* both on input and output that it is dealing with Cloud Event and generates
* appropriate headers/attributes
*/
CloudEventAttributes attributes = new CloudEventAttributes(resultMessage.getHeaders());
assertThat(attributes.isValidCloudEvent()).isTrue();
assertThat((String) attributes.getType()).isEqualTo(SpringReleaseEvent.class.getName());
assertThat((String) attributes.getSource()).isEqualTo("http://spring.io/application-application");
assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue();
assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(SpringReleaseEvent.class.getName());
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
}
private Function<Object, Object> lookup(String functionDefinition, Class<?>... configClass) {

View File

@@ -1,142 +0,0 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.cloudevent;
import java.lang.reflect.Field;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry;
import org.springframework.cloud.function.context.config.SmartCompositeMessageConverter;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ReflectionUtils;
import static org.assertj.core.api.Assertions.assertThat;
/**
*
* @author Oleg Zhurakousky
*
*/
public class CloudEventTypeConversionTests {
@Test
public void testFromMessageBinaryPayloadMatchesType() {
SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
CloudEventAttributes ceAttributes = CloudEventMessageUtils
.get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework");
ceAttributes.setDataContentType("text/plain");
Message<String> message = MessageBuilder.withPayload("Hello Ricky").copyHeaders(ceAttributes).build();
String converted = (String) messageConverter.fromMessage(message, String.class);
assertThat(converted).isEqualTo("Hello Ricky");
}
@Test
public void testFromMessageBinaryPayloadDoesNotMatchType() {
SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
CloudEventAttributes ceAttributes = CloudEventMessageUtils
.get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework");
Message<byte[]> message = MessageBuilder.withPayload("Hello Ricky".getBytes())
.copyHeaders(ceAttributes)
.setHeader(MessageHeaders.CONTENT_TYPE,
MimeTypeUtils.parseMimeType("application/cloudevents+json;charset=utf-8"))
.build();
String converted = (String) messageConverter.fromMessage(message, String.class);
assertThat(converted).isEqualTo("Hello Ricky");
}
@Test // JsonMessageConverter does some special things between byte[] and String so this works
public void testFromMessageBinaryPayloadNoDataContentTypeToString() {
SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
CloudEventAttributes ceAttributes = CloudEventMessageUtils
.get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework");
Message<byte[]> message = MessageBuilder.withPayload("Hello Ricky".getBytes())
.copyHeaders(ceAttributes)
.setHeader(MessageHeaders.CONTENT_TYPE,
MimeTypeUtils.parseMimeType("application/cloudevents+json;charset=utf-8"))
.build();
String converted = (String) messageConverter.fromMessage(message, String.class);
assertThat(converted).isEqualTo("Hello Ricky");
}
@Test // Unlike the previous test the type here is POJO so no special treatement
public void testFromMessageBinaryPayloadNoDataContentTypeToPOJO() {
SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
CloudEventAttributes ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework");
Message<byte[]> message = MessageBuilder.withPayload("Hello Ricky".getBytes())
.copyHeaders(ceAttributes)
.setHeader(MessageHeaders.CONTENT_TYPE,
MimeTypeUtils.parseMimeType("application/cloudevents+json;charset=utf-8"))
.build();
String converted = (String) messageConverter.fromMessage(message, Person.class);
assertThat(converted).isNull();
}
@Test // will fall on default CT which is json
public void testFromMessageBinaryPayloadNoDataContentTypeToPOJOThatWorks() {
SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
CloudEventAttributes ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework");
Message<byte[]> message = MessageBuilder.withPayload("{\"name\":\"Ricky\"}".getBytes())
.copyHeaders(ceAttributes)
.setHeader(MessageHeaders.CONTENT_TYPE,
MimeTypeUtils.parseMimeType("application/cloudevents+json;charset=utf-8"))
.build();
Person converted = (Person) messageConverter.fromMessage(message, Person.class);
assertThat(converted.getName()).isEqualTo("Ricky");
}
private SmartCompositeMessageConverter configure(Class<?>... configClass) {
ApplicationContext context = new SpringApplicationBuilder(configClass).run(
"--logging.level.org.springframework.cloud.function=DEBUG", "--spring.main.lazy-initialization=true");
FunctionCatalog catalog = context.getBean(FunctionCatalog.class);
Field f = ReflectionUtils.findField(BeanFactoryAwareFunctionRegistry.class, "messageConverter");
f.setAccessible(true);
try {
SmartCompositeMessageConverter messageConverter = (SmartCompositeMessageConverter) f.get(catalog);
return messageConverter;
}
catch (Exception e) {
throw new IllegalStateException(e);
}
}
@EnableAutoConfiguration
@Configuration
public static class DummyConfiguration {
}
public static class Person {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}