Improve and add javadocs
Related to GH-422 and GH-606
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -20,10 +20,22 @@ import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
|
||||
/**
|
||||
* Utility class to assist with accessing and setting Cloud Events attributes from {@link MessageHeaders}.
|
||||
* <br><br>
|
||||
* It is effectively a wrapper over {@link MessageHeaders} which is a {@link Map}.
|
||||
* It also provides best effort to both discover the actual attribute name (regardless of the prefix)
|
||||
* as well as set appropriate attribute name.
|
||||
* <br><br>
|
||||
* For example, If there is an attribute `ce-source` or `ce_source` or 'source`, by simply calling getSource()
|
||||
* we'll discover it and will return its value.
|
||||
* <br>
|
||||
* Similar effort will happen during the setting of the attribute. If you provide {@link #prefixToUse} we will
|
||||
* use it otherwise we'll attempt to determine based on current execution context which prefix to use.
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Dave Syer
|
||||
@@ -45,68 +57,111 @@ public class CloudEventAttributes extends HashMap<String, Object> {
|
||||
this.prefixToUse = prefixToUse;
|
||||
}
|
||||
|
||||
|
||||
public CloudEventAttributes(Map<String, Object> headers) {
|
||||
this(headers, null);
|
||||
}
|
||||
|
||||
public CloudEventAttributes setId(String id) {
|
||||
if (StringUtils.hasText(this.prefixToUse)) {
|
||||
this.remove(this.getAttributeName(CloudEventMessageUtils.ID));
|
||||
this.put(this.prefixToUse + CloudEventMessageUtils.ID, id);
|
||||
}
|
||||
else {
|
||||
this.put(this.getAttributeName(CloudEventMessageUtils.ID), id);
|
||||
}
|
||||
this.setAtttribute(CloudEventMessageUtils.ID, id);
|
||||
return this;
|
||||
}
|
||||
|
||||
public <A> A getId() {
|
||||
A id = this.getAtttribute(CloudEventMessageUtils.ID);
|
||||
if (id instanceof UUID) {
|
||||
id = null;
|
||||
}
|
||||
return id;
|
||||
}
|
||||
|
||||
public CloudEventAttributes setSource(String source) {
|
||||
if (StringUtils.hasText(this.prefixToUse)) {
|
||||
this.remove(this.getAttributeName(CloudEventMessageUtils.SOURCE));
|
||||
this.put(this.prefixToUse + CloudEventMessageUtils.SOURCE, source);
|
||||
}
|
||||
else {
|
||||
this.put(this.getAttributeName(CloudEventMessageUtils.SOURCE), source);
|
||||
}
|
||||
this.setAtttribute(CloudEventMessageUtils.SOURCE, source);
|
||||
return this;
|
||||
}
|
||||
|
||||
public <A> A getSource() {
|
||||
return this.getAtttribute(CloudEventMessageUtils.SOURCE);
|
||||
}
|
||||
|
||||
public CloudEventAttributes setSpecversion(String specversion) {
|
||||
if (StringUtils.hasText(this.prefixToUse)) {
|
||||
this.remove(this.getAttributeName(CloudEventMessageUtils.SPECVERSION));
|
||||
this.put(this.prefixToUse + CloudEventMessageUtils.SPECVERSION, specversion);
|
||||
}
|
||||
else {
|
||||
this.put(this.getAttributeName(CloudEventMessageUtils.SPECVERSION), specversion);
|
||||
}
|
||||
this.setAtttribute(CloudEventMessageUtils.SPECVERSION, specversion);
|
||||
return this;
|
||||
}
|
||||
|
||||
public <A> A getSpecversion() {
|
||||
return this.getAtttribute(CloudEventMessageUtils.SPECVERSION);
|
||||
}
|
||||
|
||||
public CloudEventAttributes setType(String type) {
|
||||
if (StringUtils.hasText(this.prefixToUse)) {
|
||||
this.remove(this.getAttributeName(CloudEventMessageUtils.TYPE));
|
||||
this.put(this.prefixToUse + CloudEventMessageUtils.TYPE, type);
|
||||
}
|
||||
else {
|
||||
this.put(this.getAttributeName(CloudEventMessageUtils.TYPE), type);
|
||||
}
|
||||
this.setAtttribute(CloudEventMessageUtils.TYPE, type);
|
||||
return this;
|
||||
}
|
||||
|
||||
public <A> A getType() {
|
||||
return this.getAtttribute(CloudEventMessageUtils.TYPE);
|
||||
}
|
||||
|
||||
public CloudEventAttributes setDataContentType(String datacontenttype) {
|
||||
this.setAtttribute(CloudEventMessageUtils.DATACONTENTTYPE, datacontenttype);
|
||||
return this;
|
||||
}
|
||||
|
||||
public <A> A getDataContentType() {
|
||||
return this.getAtttribute(CloudEventMessageUtils.DATACONTENTTYPE);
|
||||
}
|
||||
|
||||
public CloudEventAttributes setDataSchema(String dataschema) {
|
||||
this.setAtttribute(CloudEventMessageUtils.DATASCHEMA, dataschema);
|
||||
return this;
|
||||
}
|
||||
|
||||
public <A> A getDataSchema() {
|
||||
return this.getAtttribute(CloudEventMessageUtils.DATASCHEMA);
|
||||
}
|
||||
|
||||
public CloudEventAttributes setSubject(String subject) {
|
||||
this.setAtttribute(CloudEventMessageUtils.SUBJECT, subject);
|
||||
return this;
|
||||
}
|
||||
|
||||
public <A> A getSubect() {
|
||||
return this.getAtttribute(CloudEventMessageUtils.SUBJECT);
|
||||
}
|
||||
|
||||
public CloudEventAttributes setTime(String time) {
|
||||
this.setAtttribute(CloudEventMessageUtils.TIME, time);
|
||||
return this;
|
||||
}
|
||||
|
||||
public <A> A getTime() {
|
||||
return this.getAtttribute(CloudEventMessageUtils.TIME);
|
||||
}
|
||||
|
||||
/**
|
||||
* Will delegate to the underlying {@link Map} returning the value for the requested attribute or null.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <A> A getId() {
|
||||
if (this.containsKey(CloudEventMessageUtils.CANONICAL_ID)) {
|
||||
return (A) this.get(CloudEventMessageUtils.CANONICAL_ID);
|
||||
public <A> A getAtttribute(String attrName) {
|
||||
if (this.containsKey(CloudEventMessageUtils.ATTR_PREFIX + attrName)) {
|
||||
return (A) this.get(CloudEventMessageUtils.ATTR_PREFIX + attrName);
|
||||
}
|
||||
else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID)) {
|
||||
return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID);
|
||||
else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attrName)) {
|
||||
return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attrName);
|
||||
}
|
||||
Object id = this.get(CloudEventMessageUtils.ID);
|
||||
if (!(id instanceof UUID)) {
|
||||
return (A) id;
|
||||
}
|
||||
return null;
|
||||
return (A) this.get(attrName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if this instance of {@link CloudEventAttributes} represents valid Cloud Event.
|
||||
* This implies that it contains all 4 required attributes (id, source, type & specversion)
|
||||
*
|
||||
* @return true if this instance represents a valid Cloud Event
|
||||
*/
|
||||
public boolean isValidCloudEvent() {
|
||||
return StringUtils.hasText(this.getId())
|
||||
&& StringUtils.hasText(this.getSource())
|
||||
&& StringUtils.hasText(this.getSpecversion())
|
||||
&& StringUtils.hasText(this.getType());
|
||||
}
|
||||
|
||||
String getAttributeName(String attributeName) {
|
||||
@@ -119,65 +174,14 @@ public class CloudEventAttributes extends HashMap<String, Object> {
|
||||
return attributeName;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <A> A getSource() {
|
||||
if (this.containsKey(CloudEventMessageUtils.CANONICAL_SOURCE)) {
|
||||
return (A) this.get(CloudEventMessageUtils.CANONICAL_SOURCE);
|
||||
private CloudEventAttributes setAtttribute(String attrName, String attrValue) {
|
||||
if (StringUtils.hasText(this.prefixToUse)) {
|
||||
this.remove(this.getAttributeName(attrName));
|
||||
this.put(this.prefixToUse + attrName, attrValue);
|
||||
}
|
||||
else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)) {
|
||||
return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE);
|
||||
else {
|
||||
this.put(this.getAttributeName(attrName), attrValue);
|
||||
}
|
||||
return (A) this.get(CloudEventMessageUtils.SOURCE);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <A> A getSpecversion() {
|
||||
if (this.containsKey(CloudEventMessageUtils.CANONICAL_SPECVERSION)) {
|
||||
return (A) this.get(CloudEventMessageUtils.CANONICAL_SPECVERSION);
|
||||
}
|
||||
else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION)) {
|
||||
return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION);
|
||||
}
|
||||
return (A) this.get(CloudEventMessageUtils.SPECVERSION);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <A> A getType() {
|
||||
if (this.containsKey(CloudEventMessageUtils.CANONICAL_TYPE)) {
|
||||
return (A) this.get(CloudEventMessageUtils.CANONICAL_TYPE);
|
||||
}
|
||||
else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) {
|
||||
return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE);
|
||||
}
|
||||
return (A) this.get(CloudEventMessageUtils.TYPE);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <A> A getDataContentType() {
|
||||
Object dataContentType;
|
||||
if (this.containsKey(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE)) {
|
||||
dataContentType = this.get(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE);
|
||||
}
|
||||
else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATACONTENTTYPE)) {
|
||||
dataContentType = this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATACONTENTTYPE);
|
||||
}
|
||||
dataContentType = this.get(CloudEventMessageUtils.DATACONTENTTYPE);
|
||||
return (A) dataContentType;
|
||||
}
|
||||
|
||||
public void setDataContentType(String datacontenttype) {
|
||||
this.put(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE, datacontenttype);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <A> A getAtttribute(String name) {
|
||||
return (A) this.get(name);
|
||||
}
|
||||
|
||||
public boolean isValidCloudEvent() {
|
||||
return StringUtils.hasText(this.getId())
|
||||
&& StringUtils.hasText(this.getSource())
|
||||
&& StringUtils.hasText(this.getSpecversion())
|
||||
&& StringUtils.hasText(this.getType());
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,20 @@ package org.springframework.cloud.function.cloudevent;
|
||||
|
||||
|
||||
/**
|
||||
* Strategy that should be implemented by the user to help with outgoing Cloud Event attributes.
|
||||
* <br><br>
|
||||
* The provided `attributes` are already initialized with default values, so you can only set the ones that you need.
|
||||
* <br>
|
||||
* Once implemented, simply configure it as a bean and the framework will invoke it before the outbound Cloud Event Message is finalized.
|
||||
*
|
||||
* <pre>{@code
|
||||
* @Bean
|
||||
* public CloudEventAttributesProvider cloudEventAttributesProvider() {
|
||||
* return attributes -> {
|
||||
* attributes.setSource("https://interface21.com/").setType("com.interface21");
|
||||
* };
|
||||
* }}
|
||||
* </pre>
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Dave Syer
|
||||
|
||||
@@ -35,7 +35,7 @@ import org.springframework.util.StringUtils;
|
||||
/**
|
||||
* Miscellaneous utility methods to deal with Cloud Events - https://cloudevents.io/.
|
||||
* <br>
|
||||
* Mainly for internal use within the framework;
|
||||
* Primarily intended for the internal use within the framework;
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Dave Syer
|
||||
@@ -202,20 +202,6 @@ public final class CloudEventMessageUtils {
|
||||
return get(UUID.randomUUID().toString(), "1.0", ce_source, ce_type);
|
||||
}
|
||||
|
||||
// /**
|
||||
// * Will construct instance of {@link CloudEventAttributes} from {@link MessageHeaders}.
|
||||
// *
|
||||
// * Should copy Cloud Event related headers into an instance of {@link CloudEventAttributes}
|
||||
// * NOTE: Certain headers must not be copied.
|
||||
// *
|
||||
// * @param headers instance of {@link MessageHeaders}
|
||||
// * @return modifiable instance of {@link CloudEventAttributes}
|
||||
// */
|
||||
// public static CloudEventAttributes get(MessageHeaders headers) {
|
||||
// return new CloudEventAttributes(headers);
|
||||
// }
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static Message<?> toBinary(Message<?> inputMessage, MessageConverter messageConverter) {
|
||||
|
||||
@@ -252,32 +238,6 @@ public final class CloudEventMessageUtils {
|
||||
return inputMessage;
|
||||
}
|
||||
|
||||
private static Message<?> buildCeMessageFromStructured(Map<String, Object> structuredCloudEvent, MessageHeaders originalHeaders) {
|
||||
String prefixToUse = determinePrefixToUse(originalHeaders);
|
||||
Object data = null;
|
||||
if (structuredCloudEvent.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA)) {
|
||||
data = structuredCloudEvent.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA);
|
||||
structuredCloudEvent.remove(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA);
|
||||
}
|
||||
else if (structuredCloudEvent.containsKey(CloudEventMessageUtils.CANONICAL_DATA)) {
|
||||
data = structuredCloudEvent.get(CloudEventMessageUtils.CANONICAL_DATA);
|
||||
structuredCloudEvent.remove(CloudEventMessageUtils.CANONICAL_DATA);
|
||||
}
|
||||
else if (structuredCloudEvent.containsKey(CloudEventMessageUtils.DATA)) {
|
||||
data = structuredCloudEvent.get(CloudEventMessageUtils.DATA);
|
||||
structuredCloudEvent.remove(CloudEventMessageUtils.DATA);
|
||||
}
|
||||
Assert.notNull(data, "'data' must not be null");
|
||||
MessageBuilder<?> builder = MessageBuilder.withPayload(data);
|
||||
CloudEventAttributes attributes = new CloudEventAttributes(structuredCloudEvent);
|
||||
builder.setHeader(prefixToUse + CloudEventMessageUtils.ID, attributes.getId());
|
||||
builder.setHeader(prefixToUse + CloudEventMessageUtils.SOURCE, attributes.getSource());
|
||||
builder.setHeader(prefixToUse + CloudEventMessageUtils.TYPE, attributes.getType());
|
||||
builder.setHeader(prefixToUse + CloudEventMessageUtils.SPECVERSION, attributes.getSpecversion());
|
||||
builder.copyHeaders(originalHeaders);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public static String determinePrefixToUse(MessageHeaders messageHeaders) {
|
||||
Set<String> keys = messageHeaders.keySet();
|
||||
if (keys.contains("user-agent")) {
|
||||
@@ -304,6 +264,32 @@ public final class CloudEventMessageUtils {
|
||||
return generateDefaultAttributeValues(attributes, typeName, sourceName);
|
||||
}
|
||||
|
||||
private static Message<?> buildCeMessageFromStructured(Map<String, Object> structuredCloudEvent, MessageHeaders originalHeaders) {
|
||||
String prefixToUse = determinePrefixToUse(originalHeaders);
|
||||
Object data = null;
|
||||
if (structuredCloudEvent.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA)) {
|
||||
data = structuredCloudEvent.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA);
|
||||
structuredCloudEvent.remove(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA);
|
||||
}
|
||||
else if (structuredCloudEvent.containsKey(CloudEventMessageUtils.CANONICAL_DATA)) {
|
||||
data = structuredCloudEvent.get(CloudEventMessageUtils.CANONICAL_DATA);
|
||||
structuredCloudEvent.remove(CloudEventMessageUtils.CANONICAL_DATA);
|
||||
}
|
||||
else if (structuredCloudEvent.containsKey(CloudEventMessageUtils.DATA)) {
|
||||
data = structuredCloudEvent.get(CloudEventMessageUtils.DATA);
|
||||
structuredCloudEvent.remove(CloudEventMessageUtils.DATA);
|
||||
}
|
||||
Assert.notNull(data, "'data' must not be null");
|
||||
MessageBuilder<?> builder = MessageBuilder.withPayload(data);
|
||||
CloudEventAttributes attributes = new CloudEventAttributes(structuredCloudEvent);
|
||||
builder.setHeader(prefixToUse + CloudEventMessageUtils.ID, attributes.getId());
|
||||
builder.setHeader(prefixToUse + CloudEventMessageUtils.SOURCE, attributes.getSource());
|
||||
builder.setHeader(prefixToUse + CloudEventMessageUtils.TYPE, attributes.getType());
|
||||
builder.setHeader(prefixToUse + CloudEventMessageUtils.SPECVERSION, attributes.getSpecversion());
|
||||
builder.copyHeaders(originalHeaders);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private static CloudEventAttributes generateDefaultAttributeValues(CloudEventAttributes attributes, String source, String type) {
|
||||
if (attributes.isValidCloudEvent()) {
|
||||
return attributes
|
||||
|
||||
@@ -70,12 +70,6 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
|
||||
static final String PREFERRED_MAPPER_PROPERTY = "spring.http.converters.preferred-json-mapper";
|
||||
|
||||
// @Bean
|
||||
// @ConditionalOnMissingBean
|
||||
// public CloudEventAttributesProvider cloudEventAttributesProvider() {
|
||||
// return new DefaultCloudEventAttributesProvider();
|
||||
// }
|
||||
|
||||
@Bean
|
||||
public FunctionRegistry functionCatalog(List<MessageConverter> messageConverters, JsonMapper jsonMapper, ConfigurableApplicationContext context) {
|
||||
ConfigurableConversionService conversionService = (ConfigurableConversionService) context.getBeanFactory().getConversionService();
|
||||
|
||||
Reference in New Issue
Block a user