diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesHelper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java similarity index 70% rename from spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesHelper.java rename to spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java index 0050c2df8..e1a2e1c88 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesHelper.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java @@ -25,9 +25,11 @@ import org.springframework.util.StringUtils; /** * * @author Oleg Zhurakousky + * @author Dave Syer + * * @since 3.1 */ -public class CloudEventAttributesHelper extends HashMap { +public class CloudEventAttributes extends HashMap { /** * @@ -35,9 +37,60 @@ public class CloudEventAttributesHelper extends HashMap { private static final long serialVersionUID = 5393610770855366497L; + private final String prefixToUse; - CloudEventAttributesHelper(Map headers) { + public CloudEventAttributes(Map headers, String prefixToUse) { super(headers); + this.prefixToUse = prefixToUse; + } + + + public CloudEventAttributes(Map headers) { + this(headers, null); + } + + public CloudEventAttributes setId(String id) { + if (StringUtils.hasText(this.prefixToUse)) { + this.remove(this.getAttributeName(CloudEventMessageUtils.ID)); + this.put(this.prefixToUse + CloudEventMessageUtils.ID, id); + } + else { + this.put(this.getAttributeName(CloudEventMessageUtils.ID), id); + } + return this; + } + + public CloudEventAttributes setSource(String source) { + if (StringUtils.hasText(this.prefixToUse)) { + this.remove(this.getAttributeName(CloudEventMessageUtils.SOURCE)); + this.put(this.prefixToUse + CloudEventMessageUtils.SOURCE, source); + } + else { + this.put(this.getAttributeName(CloudEventMessageUtils.SOURCE), source); + } + return this; + } + + public CloudEventAttributes setSpecversion(String specversion) { + if (StringUtils.hasText(this.prefixToUse)) { + this.remove(this.getAttributeName(CloudEventMessageUtils.SPECVERSION)); + this.put(this.prefixToUse + CloudEventMessageUtils.SPECVERSION, specversion); + } + else { + this.put(this.getAttributeName(CloudEventMessageUtils.SPECVERSION), specversion); + } + return this; + } + + public CloudEventAttributes setType(String type) { + if (StringUtils.hasText(this.prefixToUse)) { + this.remove(this.getAttributeName(CloudEventMessageUtils.TYPE)); + this.put(this.prefixToUse + CloudEventMessageUtils.TYPE, type); + } + else { + this.put(this.getAttributeName(CloudEventMessageUtils.TYPE), type); + } + return this; } @SuppressWarnings("unchecked") diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java index baa309604..2833e38bc 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java @@ -16,22 +16,19 @@ package org.springframework.cloud.function.cloudevent; -import java.util.Map; - -import org.springframework.messaging.Message; /** * * @author Oleg Zhurakousky + * @author Dave Syer + * * @since 3.1 */ @FunctionalInterface public interface CloudEventAttributesProvider { /** * - * @param inputMessage input message used to invoke user functionality (e.g., function) - * @param result result of the invocation of user functionality (e.g., function) - * @return instance of {@link CloudEventAttributesHelper} + * @param attributes instance of {@link CloudEventAttributes} */ - Map generateDefaultCloudEventHeaders(Message inputMessage, Object result); + void generateDefaultCloudEventHeaders(CloudEventAttributes attributes); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index daf4d76b5..851d147d2 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -16,6 +16,7 @@ package org.springframework.cloud.function.cloudevent; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -38,6 +39,8 @@ import org.springframework.util.StringUtils; * Mainly for internal use within the framework; * * @author Oleg Zhurakousky + * @author Dave Syer + * * @since 3.1 */ public final class CloudEventMessageUtils { @@ -162,20 +165,20 @@ public final class CloudEventMessageUtils { * Checks if {@link Message} represents cloud event in binary-mode. */ public static boolean isBinary(Map headers) { - CloudEventAttributesHelper attributes = new CloudEventAttributesHelper(headers); + CloudEventAttributes attributes = new CloudEventAttributes(headers); return attributes.isValidCloudEvent(); } /** - * Will construct instance of {@link CloudEventAttributesHelper} setting its required attributes. + * Will construct instance of {@link CloudEventAttributes} setting its required attributes. * * @param ce_id value for Cloud Event 'id' attribute * @param ce_specversion value for Cloud Event 'specversion' attribute * @param ce_source value for Cloud Event 'source' attribute * @param ce_type value for Cloud Event 'type' attribute - * @return instance of {@link CloudEventAttributesHelper} + * @return instance of {@link CloudEventAttributes} */ - public static CloudEventAttributesHelper get(String ce_id, String ce_specversion, String ce_source, String ce_type) { + public static CloudEventAttributes get(String ce_id, String ce_specversion, String ce_source, String ce_type) { Assert.hasText(ce_id, "'ce_id' must not be null or empty"); Assert.hasText(ce_specversion, "'ce_specversion' must not be null or empty"); Assert.hasText(ce_source, "'ce_source' must not be null or empty"); @@ -185,40 +188,40 @@ public final class CloudEventMessageUtils { requiredAttributes.put(CloudEventMessageUtils.CANONICAL_SPECVERSION, ce_specversion); requiredAttributes.put(CloudEventMessageUtils.CANONICAL_SOURCE, ce_source); requiredAttributes.put(CloudEventMessageUtils.CANONICAL_TYPE, ce_type); - return new CloudEventAttributesHelper(requiredAttributes); + return new CloudEventAttributes(requiredAttributes); } /** - * Will construct instance of {@link CloudEventAttributesHelper} + * Will construct instance of {@link CloudEventAttributes} * Should default/generate cloud event ID and SPECVERSION. * * @param ce_source value for Cloud Event 'source' attribute * @param ce_type value for Cloud Event 'type' attribute - * @return instance of {@link CloudEventAttributesHelper} + * @return instance of {@link CloudEventAttributes} */ - public static CloudEventAttributesHelper get(String ce_source, String ce_type) { + public static CloudEventAttributes get(String ce_source, String ce_type) { return get(UUID.randomUUID().toString(), "1.0", ce_source, ce_type); } - /** - * Will construct instance of {@link CloudEventAttributesHelper} from {@link MessageHeaders}. - * - * Should copy Cloud Event related headers into an instance of {@link CloudEventAttributesHelper} - * NOTE: Certain headers must not be copied. - * - * @param headers instance of {@link MessageHeaders} - * @return modifiable instance of {@link CloudEventAttributesHelper} - */ - public static RequiredAttributeAccessor get(MessageHeaders headers) { - return new RequiredAttributeAccessor(headers); - } +// /** +// * Will construct instance of {@link CloudEventAttributes} from {@link MessageHeaders}. +// * +// * Should copy Cloud Event related headers into an instance of {@link CloudEventAttributes} +// * NOTE: Certain headers must not be copied. +// * +// * @param headers instance of {@link MessageHeaders} +// * @return modifiable instance of {@link CloudEventAttributes} +// */ +// public static CloudEventAttributes get(MessageHeaders headers) { +// return new CloudEventAttributes(headers); +// } @SuppressWarnings("unchecked") public static Message toBinary(Message inputMessage, MessageConverter messageConverter) { Map headers = inputMessage.getHeaders(); - CloudEventAttributesHelper attributes = new CloudEventAttributesHelper(headers); + CloudEventAttributes attributes = new CloudEventAttributes(headers); // first check the obvious and see if content-type is `cloudevents` if (!attributes.isValidCloudEvent() && headers.containsKey(MessageHeaders.CONTENT_TYPE)) { @@ -265,7 +268,7 @@ public final class CloudEventMessageUtils { } Assert.notNull(data, "'data' must not be null"); MessageBuilder builder = MessageBuilder.withPayload(data); - CloudEventAttributesHelper attributes = new CloudEventAttributesHelper(structuredCloudEvent); + CloudEventAttributes attributes = new CloudEventAttributes(structuredCloudEvent); builder.setHeader(prefixToUse + CloudEventMessageUtils.ID, attributes.getId()); builder.setHeader(prefixToUse + CloudEventMessageUtils.SOURCE, attributes.getSource()); builder.setHeader(prefixToUse + CloudEventMessageUtils.TYPE, attributes.getType()); @@ -282,4 +285,16 @@ public final class CloudEventMessageUtils { return CloudEventMessageUtils.ATTR_PREFIX; } } + + public static Map generateDefaultCloudEventHeaders(Message inputMessage, Object result, String applicationName) { + CloudEventAttributes attributes = new CloudEventAttributes(inputMessage.getHeaders(), CloudEventMessageUtils.determinePrefixToUse(inputMessage)); + if (attributes.isValidCloudEvent()) { + return attributes + .setSpecversion("1.0") + .setId(UUID.randomUUID().toString()) + .setType(result.getClass().getName()) + .setSource(applicationName); + } + return Collections.emptyMap(); + } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/DefaultCloudEventAttributesProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/DefaultCloudEventAttributesProvider.java deleted file mode 100644 index d49627c54..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/DefaultCloudEventAttributesProvider.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2020-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.cloudevent; - -import java.util.Collections; -import java.util.Map; -import java.util.UUID; - -import org.springframework.beans.BeansException; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.core.env.ConfigurableEnvironment; -import org.springframework.messaging.Message; -import org.springframework.util.StringUtils; - -/** - * - * @author Oleg Zhurakousky - * @since 3.1 - * - */ -public class DefaultCloudEventAttributesProvider implements CloudEventAttributesProvider, ApplicationContextAware { - - private ConfigurableApplicationContext applicationContext; - - - @Override - public Map generateDefaultCloudEventHeaders(Message inputMessage, Object result) { - RequiredAttributeAccessor attributes = new RequiredAttributeAccessor(inputMessage.getHeaders(), CloudEventMessageUtils.determinePrefixToUse(inputMessage)); - if (attributes.isValidCloudEvent()) { - String applicationName = this.getApplicationName(); - return attributes - .setId(UUID.randomUUID().toString()) - .setType(result.getClass().getName()) - .setSource(applicationName); - } - return Collections.emptyMap(); - } - - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - this.applicationContext = (ConfigurableApplicationContext) applicationContext; - } - - private String getApplicationName() { - ConfigurableEnvironment environment = this.applicationContext.getEnvironment(); - String name = environment.getProperty("spring.application.name"); - return "http://spring.io/" + (StringUtils.hasText(name) ? name : "application-" + this.applicationContext.getId()); - } -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/RequiredAttributeAccessor.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/RequiredAttributeAccessor.java deleted file mode 100644 index 97a444b13..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/RequiredAttributeAccessor.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2020-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.cloudevent; - -import java.util.Map; - -import org.springframework.util.StringUtils; - -/** - * - * @author Oleg Zhurakousky - * @since 3.1 - */ -public class RequiredAttributeAccessor extends CloudEventAttributesHelper { - - private final String prefixToUse; - - /** - * - */ - private static final long serialVersionUID = 859410409447601477L; - - public RequiredAttributeAccessor(Map headers, String prefixToUse) { - super(headers); - this.prefixToUse = prefixToUse; - } - - public RequiredAttributeAccessor(Map headers) { - this(headers, null); - } - - public RequiredAttributeAccessor setId(String id) { - if (StringUtils.hasText(this.prefixToUse)) { - this.put(this.prefixToUse + CloudEventMessageUtils.ID, id); - } - else { - this.put(this.getAttributeName(CloudEventMessageUtils.ID), id); - } - return this; - } - - public RequiredAttributeAccessor setSource(String source) { - if (StringUtils.hasText(this.prefixToUse)) { - this.put(this.prefixToUse + CloudEventMessageUtils.SOURCE, source); - } - else { - this.put(this.getAttributeName(CloudEventMessageUtils.SOURCE), source); - } - return this; - } - - public RequiredAttributeAccessor setSpecversion(String specversion) { - if (StringUtils.hasText(this.prefixToUse)) { - this.put(this.prefixToUse + CloudEventMessageUtils.SPECVERSION, specversion); - } - else { - this.put(this.getAttributeName(CloudEventMessageUtils.SPECVERSION), specversion); - } - return this; - } - - public RequiredAttributeAccessor setType(String type) { - if (StringUtils.hasText(this.prefixToUse)) { - this.put(this.prefixToUse + CloudEventMessageUtils.TYPE, type); - } - else { - this.put(this.getAttributeName(CloudEventMessageUtils.TYPE), type); - } - return this; - } -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java index a5979ae64..e77e5634c 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java @@ -19,6 +19,7 @@ package org.springframework.cloud.function.context.catalog; import java.lang.reflect.Method; import java.lang.reflect.Type; import java.util.Arrays; +import java.util.Map; import java.util.Set; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -33,7 +34,9 @@ import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils; +import org.springframework.cloud.function.cloudevent.CloudEventAttributes; import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider; +import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionRegistry; @@ -42,6 +45,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.support.GenericApplicationContext; import org.springframework.core.convert.ConversionService; +import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.messaging.Message; import org.springframework.messaging.converter.CompositeMessageConverter; import org.springframework.messaging.support.MessageBuilder; @@ -158,13 +162,20 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp function = super.doLookup(type, functionDefinition, expectedOutputMimeTypes); } - if (function != null && this.cloudEventAtttributesProvider != null) { + if (function != null) { BiFunction, Object, Message> invocationResultHeaderEnricher = new BiFunction, Object, Message>() { @Override public Message apply(Message inputMessage, Object invocationResult) { - Message message = MessageBuilder.withPayload(invocationResult).copyHeaders( - cloudEventAtttributesProvider.generateDefaultCloudEventHeaders(inputMessage, invocationResult)) + // TODO: Factor it out! Cloud Events specific code + Map generatedCeHeaders = CloudEventMessageUtils + .generateDefaultCloudEventHeaders(inputMessage, invocationResult, getApplicationName()); + CloudEventAttributes attributes = new CloudEventAttributes(generatedCeHeaders); + if (cloudEventAtttributesProvider != null) { + cloudEventAtttributesProvider.generateDefaultCloudEventHeaders(attributes); + } + Message message = MessageBuilder.withPayload(invocationResult) + .copyHeaders(generatedCeHeaders) .build(); return message; @@ -176,7 +187,11 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp return (T) function; } - + private String getApplicationName() { + ConfigurableEnvironment environment = this.applicationContext.getEnvironment(); + String name = environment.getProperty("spring.application.name"); + return "http://spring.io/" + (StringUtils.hasText(name) ? name : "application-" + this.applicationContext.getId()); + } private Object discoverFunctionInBeanFactory(String functionName) { Object functionCandidate = null; diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index aed66e6e4..a365e08b4 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -30,8 +30,6 @@ import com.google.gson.Gson; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider; -import org.springframework.cloud.function.cloudevent.DefaultCloudEventAttributesProvider; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionRegistry; @@ -72,11 +70,11 @@ public class ContextFunctionCatalogAutoConfiguration { static final String PREFERRED_MAPPER_PROPERTY = "spring.http.converters.preferred-json-mapper"; - @Bean - @ConditionalOnMissingBean - public CloudEventAttributesProvider cloudEventAttributesProvider() { - return new DefaultCloudEventAttributesProvider(); - } +// @Bean +// @ConditionalOnMissingBean +// public CloudEventAttributesProvider cloudEventAttributesProvider() { +// return new DefaultCloudEventAttributesProvider(); +// } @Bean public FunctionRegistry functionCatalog(List messageConverters, JsonMapper jsonMapper, ConfigurableApplicationContext context) { diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java index 83b20f88e..67e2b5cf8 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java @@ -45,7 +45,7 @@ public class CloudEventTypeConversionTests { @Test public void testFromMessageBinaryPayloadMatchesType() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils + CloudEventAttributes ceAttributes = CloudEventMessageUtils .get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework"); ceAttributes.setDataContentType("text/plain"); Message message = MessageBuilder.withPayload("Hello Ricky").copyHeaders(ceAttributes).build(); @@ -57,7 +57,7 @@ public class CloudEventTypeConversionTests { @Test public void testFromMessageBinaryPayloadDoesNotMatchType() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils + CloudEventAttributes ceAttributes = CloudEventMessageUtils .get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework"); Message message = MessageBuilder.withPayload("Hello Ricky".getBytes()) .copyHeaders(ceAttributes) @@ -71,7 +71,7 @@ public class CloudEventTypeConversionTests { @Test // JsonMessageConverter does some special things between byte[] and String so this works public void testFromMessageBinaryPayloadNoDataContentTypeToString() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils + CloudEventAttributes ceAttributes = CloudEventMessageUtils .get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework"); Message message = MessageBuilder.withPayload("Hello Ricky".getBytes()) .copyHeaders(ceAttributes) @@ -85,7 +85,7 @@ public class CloudEventTypeConversionTests { @Test // Unlike the previous test the type here is POJO so no special treatement public void testFromMessageBinaryPayloadNoDataContentTypeToPOJO() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework"); + CloudEventAttributes ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework"); Message message = MessageBuilder.withPayload("Hello Ricky".getBytes()) .copyHeaders(ceAttributes) .setHeader(MessageHeaders.CONTENT_TYPE, @@ -98,7 +98,7 @@ public class CloudEventTypeConversionTests { @Test // will fall on default CT which is json public void testFromMessageBinaryPayloadNoDataContentTypeToPOJOThatWorks() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework"); + CloudEventAttributes ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework"); Message message = MessageBuilder.withPayload("{\"name\":\"Ricky\"}".getBytes()) .copyHeaders(ceAttributes) .setHeader(MessageHeaders.CONTENT_TYPE, diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java index 6e5cb1156..3a781c298 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java @@ -21,7 +21,7 @@ import java.util.function.Function; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.cloud.function.cloudevent.CloudEventAttributesHelper; +import org.springframework.cloud.function.cloudevent.CloudEventAttributes; import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider; import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; import org.springframework.context.annotation.Bean; @@ -84,20 +84,21 @@ public class CloudeventDemoApplication { } @Bean - public Function, Message> consumeAndProduceCloudEvent(CloudEventAttributesProvider ceAttrProvider) { + public Function, Message> consumeAndProduceCloudEvent() { return ceMessage -> { SpringReleaseEvent data = ceMessage.getPayload(); data.setVersion("2.0"); data.setReleaseDateAsString("01-10-2006"); - CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils.get(ceMessage.getHeaders()) - .setSource("https://interface21.com/") - .setType("com.interface21"); - - return MessageBuilder.withPayload(data).copyHeaders(ceAttributes).build(); + return MessageBuilder.withPayload(data).build(); }; } + @Bean + public CloudEventAttributesProvider cloudEventAttributesProvider() { + return attributes -> attributes.setSource("https://interface21.com/").setType("com.interface21"); + } + @Bean public Function, Map> consumeAndProduceCloudEventAsMapToMap() { diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java index c841cff80..27adb8d70 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java @@ -262,6 +262,10 @@ public class CloudeventDemoApplicationRESTTests { .isEqualTo(Collections.singletonList("http://spring.io/application-application")); assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) .isEqualTo(Collections.singletonList(SpringReleaseEvent.class.getName())); + assertThat(response.getHeaders().get(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.TYPE)).isNull(); + assertThat(response.getHeaders().get(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.SOURCE)).isNull(); + assertThat(response.getHeaders().get(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.ID)).isNull(); + assertThat(response.getHeaders().get(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION)).isNull(); } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java index e0c13d582..f7b8c0084 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Supplier; import java.util.stream.Stream; @@ -32,6 +33,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.beans.factory.ObjectProvider; +import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; @@ -195,8 +197,10 @@ public class RequestProcessor { result = Mono.from(result) .map(message -> MessageUtils.unpack(handler, message)) .doOnNext(value -> { - builder.headers(HeaderUtils.sanitize(request.headers())); addHeaders(builder, value); + if (!isValidCloudEvent(value.getHeaders().keySet())) { + builder.headers(HeaderUtils.sanitize(request.headers())); + } }) .map(message -> message.getPayload()); } @@ -204,8 +208,10 @@ public class RequestProcessor { result = Flux.from(result) .map(message -> MessageUtils.unpack(handler, message)) .doOnNext(value -> { - builder.headers(HeaderUtils.sanitize(request.headers())); addHeaders(builder, value); + if (!isValidCloudEvent(value.getHeaders().keySet())) { + builder.headers(HeaderUtils.sanitize(request.headers())); + } }) .map(message -> message.getPayload()); } @@ -225,6 +231,13 @@ public class RequestProcessor { return Mono.from(result).flatMap(body -> Mono.just(builder.body(body))); } + public boolean isValidCloudEvent(Set headerKeys) { + return headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID) + && headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE) + && headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE) + && headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION); + } + // this seem to be very relevant to AWS container tests private Flux messages(FunctionWrapper request, Object function, Flux flux) { Map headers = new HashMap<>(HeaderUtils.fromHttp(request.headers()));