GH-422 GH-606 Add support for simplifying message headers to attribute mapping

Added CloudEventAttributesProvider and default implementation
Added CloudEventMessageUtils
This commit is contained in:
Oleg Zhurakousky
2020-11-13 10:24:27 +01:00
parent a26ad928f6
commit f0b2ce7691
12 changed files with 438 additions and 69 deletions

View File

@@ -0,0 +1,84 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.cloudevent;
import java.util.HashMap;
import java.util.Map;
/**
*
* @author Oleg Zhurakousky
* @since 3.1
*/
public class CloudEventAttributes extends HashMap<String, Object> {
/**
*
*/
private static final long serialVersionUID = 5393610770855366497L;
CloudEventAttributes(Map<String, Object> headers) {
super(headers);
}
@SuppressWarnings("unchecked")
public <A> A getId() {
return this.containsKey(CloudEventMessageUtils.CE_ID)
? (A) this.get(CloudEventMessageUtils.CE_ID)
: (A) this.get(CloudEventMessageUtils.ID);
}
@SuppressWarnings("unchecked")
public <A> A getSource() {
return this.containsKey(CloudEventMessageUtils.CE_SOURCE)
? (A) this.get(CloudEventMessageUtils.CE_SOURCE)
: (A) this.get(CloudEventMessageUtils.SOURCE);
}
@SuppressWarnings("unchecked")
public <A> A getSpecversion() {
return this.containsKey(CloudEventMessageUtils.CE_SPECVERSION)
? (A) this.get(CloudEventMessageUtils.CE_SPECVERSION)
: (A) this.get(CloudEventMessageUtils.SPECVERSION);
}
@SuppressWarnings("unchecked")
public <A> A getType() {
return this.containsKey(CloudEventMessageUtils.CE_TYPE)
? (A) this.get(CloudEventMessageUtils.CE_TYPE)
: (A) this.get(CloudEventMessageUtils.TYPE);
}
@SuppressWarnings("unchecked")
public <A> A getDataContentType() {
return this.containsKey(CloudEventMessageUtils.CE_DATACONTENTTYPE)
? (A) this.get(CloudEventMessageUtils.CE_DATACONTENTTYPE)
: (A) this.get(CloudEventMessageUtils.DATACONTENTTYPE);
}
public void setDataContentType(String datacontenttype) {
this.put(CloudEventMessageUtils.CE_DATACONTENTTYPE, datacontenttype);
}
@SuppressWarnings("unchecked")
public <A> A getAtttribute(String name) {
return (A) this.get(name);
}
}

View File

@@ -0,0 +1,60 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.cloudevent;
import org.springframework.messaging.MessageHeaders;
/**
*
* @author Oleg Zhurakousky
* @since 3.1
*/
public interface CloudEventAtttributesProvider {
/**
* Will construct instance of {@link CloudEventAttributes} setting its required attributes.
*
* @param ce_id value for Cloud Event 'id' attribute
* @param ce_specversion value for Cloud Event 'specversion' attribute
* @param ce_source value for Cloud Event 'source' attribute
* @param ce_type value for Cloud Event 'type' attribute
* @return instance of {@link CloudEventAttributes}
*/
CloudEventAttributes get(String ce_id, String ce_specversion, String ce_source, String ce_type);
/**
* Will construct instance of {@link CloudEventAttributes}
* Should default/generate cloud event ID and SPECVERSION.
*
* @param ce_source value for Cloud Event 'source' attribute
* @param ce_type value for Cloud Event 'type' attribute
* @return instance of {@link CloudEventAttributes}
*/
CloudEventAttributes get(String ce_source, String ce_type);
/**
* Will construct instance of {@link CloudEventAttributes} from {@link MessageHeaders}.
*
* Should copy Cloud Event related headers into an instance of {@link CloudEventAttributes}
* NOTE: Certain headers must not be copied.
*
* @param headers instance of {@link MessageHeaders}
* @return modifiable instance of {@link CloudEventAttributes}
*/
RequiredAttributeAccessor get(MessageHeaders headers);
}

View File

@@ -50,7 +50,7 @@ public class CloudEventDataContentTypeMessagePreProcessor implements Function<Me
private final ContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver();
private final MimeType cloudEventContentType = MimeTypeUtils.parseMimeType("application/cloudevents");
private final MimeType cloudEventContentType = CloudEventMessageUtils.APPLICATION_CLOUDEVENTS;
private final CompositeMessageConverter messageConverter;
@@ -62,7 +62,7 @@ public class CloudEventDataContentTypeMessagePreProcessor implements Function<Me
@SuppressWarnings("unchecked")
@Override
public Message<?> apply(Message<?> inputMessage) {
if (CloudEventUtils.isBinary(inputMessage)) {
if (CloudEventMessageUtils.isBinary(inputMessage.getHeaders())) {
String dataContentType = this.getDataContentType(inputMessage.getHeaders());
Message<?> message = MessageBuilder.fromMessage(inputMessage)
.setHeader(MessageHeaders.CONTENT_TYPE, dataContentType)
@@ -78,7 +78,7 @@ public class CloudEventDataContentTypeMessagePreProcessor implements Function<Me
.parseMimeType(contentType.getType() + "/" + suffix);
Message<?> cloudEventMessage = MessageBuilder.fromMessage(inputMessage)
.setHeader(MessageHeaders.CONTENT_TYPE, cloudEventDeserializationContentType)
.setHeader(CloudEventUtils.CE_DATACONTENTTYPE, dataContentType).build();
.setHeader(CloudEventMessageUtils.CE_DATACONTENTTYPE, dataContentType).build();
Map<String, Object> structuredCloudEvent = (Map<String, Object>) this.messageConverter
.fromMessage(cloudEventMessage, Map.class);
Message<?> binaryCeMessage = this.buildCeMessageFromStructured(structuredCloudEvent);
@@ -90,27 +90,27 @@ public class CloudEventDataContentTypeMessagePreProcessor implements Function<Me
}
private Message<?> buildCeMessageFromStructured(Map<String, Object> structuredCloudEvent) {
MessageBuilder<?> builder = MessageBuilder.withPayload(structuredCloudEvent.get(CloudEventUtils.DATA));
structuredCloudEvent.remove(CloudEventUtils.DATA);
MessageBuilder<?> builder = MessageBuilder.withPayload(structuredCloudEvent.get(CloudEventMessageUtils.DATA));
structuredCloudEvent.remove(CloudEventMessageUtils.DATA);
builder.copyHeaders(structuredCloudEvent);
return builder.build();
}
private String getDataContentType(MessageHeaders headers) {
if (headers.containsKey(CloudEventUtils.DATACONTENTTYPE)) {
return (String) headers.get(CloudEventUtils.DATACONTENTTYPE);
if (headers.containsKey(CloudEventMessageUtils.DATACONTENTTYPE)) {
return (String) headers.get(CloudEventMessageUtils.DATACONTENTTYPE);
}
else if (headers.containsKey(CloudEventUtils.CE_DATACONTENTTYPE)) {
return (String) headers.get(CloudEventUtils.CE_DATACONTENTTYPE);
else if (headers.containsKey(CloudEventMessageUtils.CE_DATACONTENTTYPE)) {
return (String) headers.get(CloudEventMessageUtils.CE_DATACONTENTTYPE);
}
else if (headers.containsKey(MessageHeaders.CONTENT_TYPE)) {
return headers.get(MessageHeaders.CONTENT_TYPE).toString();
}
return "application/json";
return MimeTypeUtils.APPLICATION_JSON_VALUE;
}
private boolean isStructured(Message<?> message) {
if (!CloudEventUtils.isBinary(message)) {
if (!CloudEventMessageUtils.isBinary(message.getHeaders())) {
Map<String, Object> headers = message.getHeaders();
if (headers.containsKey(MessageHeaders.CONTENT_TYPE)) {

View File

@@ -32,7 +32,8 @@ import org.springframework.util.MimeType;
public class CloudEventJsonMessageConverter extends JsonMessageConverter {
public CloudEventJsonMessageConverter(JsonMapper jsonMapper) {
super(jsonMapper, new MimeType("application", "cloudevents+json"));
super(jsonMapper, new MimeType(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getType(),
CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getSubtype() + "+json"));
this.setStrictContentTypeMatch(true);
}
}

View File

@@ -19,6 +19,8 @@ package org.springframework.cloud.function.cloudevent;
import java.util.Map;
import org.springframework.messaging.Message;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
/**
* Miscellaneous utility methods to deal with Cloud Events - https://cloudevents.io/.
@@ -28,12 +30,22 @@ import org.springframework.messaging.Message;
* @author Oleg Zhurakousky
* @since 3.1
*/
public final class CloudEventUtils {
public final class CloudEventMessageUtils {
private CloudEventUtils() {
private CloudEventMessageUtils() {
}
/**
* String value of 'application/cloudevents' mime type.
*/
public static String APPLICATION_CLOUDEVENTS_VALUE = "application/cloudevents";
/**
* {@link MimeType} instance representing 'application/cloudevents' mime type.
*/
public static MimeType APPLICATION_CLOUDEVENTS = MimeTypeUtils.parseMimeType(APPLICATION_CLOUDEVENTS_VALUE);
/**
* Prefix for attributes.
*/
@@ -132,16 +144,15 @@ public final class CloudEventUtils {
/**
* Checks if {@link Message} represents cloud event in binary-mode.
*/
public static boolean isBinary(Message<?> message) {
Map<String, Object> headers = message.getHeaders();
return (headers.containsKey("id")
&& headers.containsKey("source")
&& headers.containsKey("specversion")
&& headers.containsKey("type"))
public static boolean isBinary(Map<String, Object> headers) {
return (headers.containsKey(ID)
&& headers.containsKey(SOURCE)
&& headers.containsKey(SPECVERSION)
&& headers.containsKey(TYPE))
||
(headers.containsKey("ce_id")
&& headers.containsKey("ce_source")
&& headers.containsKey("ce_specversion")
&& headers.containsKey("ce_type"));
(headers.containsKey(CE_ID)
&& headers.containsKey(CE_SOURCE)
&& headers.containsKey(CE_SPECVERSION)
&& headers.containsKey(CE_TYPE));
}
}

View File

@@ -0,0 +1,64 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.cloudevent;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
/**
*
* @author Oleg Zhurakousky
* @since 3.1
*
*/
public class DefaultCloudEventAttributesProvider implements CloudEventAtttributesProvider {
/*
* should i provide instance() method for convinience or should it be always injected into function
*/
@Override
public CloudEventAttributes get(String ce_id, String ce_specversion, String ce_source, String ce_type) {
Assert.hasText(ce_id, "'ce_id' must not be null or empty");
Assert.hasText(ce_specversion, "'ce_specversion' must not be null or empty");
Assert.hasText(ce_source, "'ce_source' must not be null or empty");
Assert.hasText(ce_type, "'ce_type' must not be null or empty");
Map<String, Object> requiredAttributes = new HashMap<>();
requiredAttributes.put(CloudEventMessageUtils.CE_ID, ce_id);
requiredAttributes.put(CloudEventMessageUtils.CE_SPECVERSION, ce_specversion);
requiredAttributes.put(CloudEventMessageUtils.CE_SOURCE, ce_source);
requiredAttributes.put(CloudEventMessageUtils.CE_TYPE, ce_type);
return new CloudEventAttributes(requiredAttributes);
}
@Override
public CloudEventAttributes get(String ce_source, String ce_type) {
return this.get(UUID.randomUUID().toString(), "1.0", ce_source, ce_type);
}
/**
* By default it will copy all the headers while exposing accessor to allow user to modify any of them.
*/
@Override
public RequiredAttributeAccessor get(MessageHeaders headers) {
return new RequiredAttributeAccessor(headers);
}
}

View File

@@ -0,0 +1,56 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.cloudevent;
import java.util.Map;
/**
*
* @author Oleg Zhurakousky
* @since 3.1
*/
public class RequiredAttributeAccessor extends CloudEventAttributes {
/**
*
*/
private static final long serialVersionUID = 859410409447601477L;
RequiredAttributeAccessor(Map<String, Object> headers) {
super(headers);
}
public RequiredAttributeAccessor setId(String id) {
this.put(CloudEventMessageUtils.CE_ID, id);
return this;
}
public RequiredAttributeAccessor setSource(String source) {
this.put(CloudEventMessageUtils.CE_SOURCE, source);
return this;
}
public RequiredAttributeAccessor setSpecversion(String specversion) {
this.put(CloudEventMessageUtils.CE_SPECVERSION, specversion);
return this;
}
public RequiredAttributeAccessor setType(String type) {
this.put(CloudEventMessageUtils.CE_TYPE, type);
return this;
}
}

View File

@@ -30,8 +30,10 @@ import com.google.gson.Gson;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.function.cloudevent.CloudEventAtttributesProvider;
import org.springframework.cloud.function.cloudevent.CloudEventDataContentTypeMessagePreProcessor;
import org.springframework.cloud.function.cloudevent.CloudEventJsonMessageConverter;
import org.springframework.cloud.function.cloudevent.DefaultCloudEventAttributesProvider;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.FunctionRegistry;
@@ -72,6 +74,11 @@ public class ContextFunctionCatalogAutoConfiguration {
static final String PREFERRED_MAPPER_PROPERTY = "spring.http.converters.preferred-json-mapper";
@Bean
public CloudEventAtttributesProvider cloudEventAttributesProvider() {
return new DefaultCloudEventAttributesProvider();
}
@Bean
public FunctionRegistry functionCatalog(List<MessageConverter> messageConverters, JsonMapper jsonMapper, ConfigurableApplicationContext context) {
ConfigurableConversionService conversionService = (ConfigurableConversionService) context.getBeanFactory().getConversionService();