diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java index e1a2e1c88..455c12957 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java @@ -18,6 +18,7 @@ package org.springframework.cloud.function.cloudevent; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import org.springframework.util.StringUtils; @@ -101,6 +102,10 @@ public class CloudEventAttributes extends HashMap { else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID)) { return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID); } + Object id = this.get(CloudEventMessageUtils.ID); + if (!(id instanceof UUID)) { + return (A) id; + } return null; } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index 69c14c039..e3a4832b4 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -237,9 +237,10 @@ public final class CloudEventMessageUtils { .parseMimeType(contentType.getType() + "/" + suffix); Message cloudEventMessage = MessageBuilder.fromMessage(inputMessage) .setHeader(MessageHeaders.CONTENT_TYPE, cloudEventDeserializationContentType) - .setHeader(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE, dataContentType).build(); + .setHeader(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE, dataContentType) + .build(); Map structuredCloudEvent = (Map) messageConverter.fromMessage(cloudEventMessage, Map.class); - Message binaryCeMessage = buildCeMessageFromStructured(structuredCloudEvent, determinePrefixToUse(inputMessage)); + Message binaryCeMessage = buildCeMessageFromStructured(structuredCloudEvent, inputMessage.getHeaders()); return binaryCeMessage; } } @@ -251,7 +252,8 @@ public final class CloudEventMessageUtils { return inputMessage; } - private static Message buildCeMessageFromStructured(Map structuredCloudEvent, String prefixToUse) { + private static Message buildCeMessageFromStructured(Map structuredCloudEvent, MessageHeaders originalHeaders) { + String prefixToUse = determinePrefixToUse(originalHeaders); Object data = null; if (structuredCloudEvent.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA)) { data = structuredCloudEvent.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA); @@ -272,11 +274,12 @@ public final class CloudEventMessageUtils { builder.setHeader(prefixToUse + CloudEventMessageUtils.SOURCE, attributes.getSource()); builder.setHeader(prefixToUse + CloudEventMessageUtils.TYPE, attributes.getType()); builder.setHeader(prefixToUse + CloudEventMessageUtils.SPECVERSION, attributes.getSpecversion()); + builder.copyHeaders(originalHeaders); return builder.build(); } - public static String determinePrefixToUse(Message inputMessage) { - Set keys = inputMessage.getHeaders().keySet(); + public static String determinePrefixToUse(MessageHeaders messageHeaders) { + Set keys = messageHeaders.keySet(); if (keys.contains("user-agent")) { return CloudEventMessageUtils.HTTP_ATTR_PREFIX; } @@ -296,9 +299,9 @@ public final class CloudEventMessageUtils { return attributes; } - public static CloudEventAttributes generateAttributes(Message inputMessage, Object result, String applicationName) { - CloudEventAttributes attributes = new CloudEventAttributes(inputMessage.getHeaders(), CloudEventMessageUtils.determinePrefixToUse(inputMessage)); - return generateDefaultAttributeValues(attributes, result.getClass().getName(), applicationName); + public static CloudEventAttributes generateAttributes(Message inputMessage, String typeName, String sourceName) { + CloudEventAttributes attributes = new CloudEventAttributes(inputMessage.getHeaders(), CloudEventMessageUtils.determinePrefixToUse(inputMessage.getHeaders())); + return generateDefaultAttributeValues(attributes, typeName, sourceName); } private static CloudEventAttributes generateDefaultAttributeValues(CloudEventAttributes attributes, String source, String type) { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java index 4aa9968ef..64099aa9b 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java @@ -163,13 +163,12 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp if (function != null) { BiFunction, Object, Message> invocationResultHeaderEnricher = new BiFunction, Object, Message>() { - @Override public Message apply(Message inputMessage, Object invocationResult) { // TODO: Factor it out! Cloud Events specific code CloudEventAttributes generatedCeHeaders = CloudEventMessageUtils - .generateAttributes(inputMessage, invocationResult, getApplicationName()); - CloudEventAttributes attributes = new CloudEventAttributes(generatedCeHeaders, CloudEventMessageUtils.determinePrefixToUse(inputMessage)); + .generateAttributes(inputMessage, invocationResult.getClass().getName(), getApplicationName()); + CloudEventAttributes attributes = new CloudEventAttributes(generatedCeHeaders, CloudEventMessageUtils.determinePrefixToUse(inputMessage.getHeaders())); if (cloudEventAtttributesProvider != null) { cloudEventAtttributesProvider.generateDefaultCloudEventHeaders(attributes); } diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java index d2efb41dc..2ac2565ca 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java @@ -22,12 +22,9 @@ import java.util.Map; import java.util.function.Consumer; import java.util.function.Function; -import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.web.client.RestTemplateBuilder; -import org.springframework.cloud.function.cloudevent.CloudEventAttributes; import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider; import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; import org.springframework.cloud.function.web.util.HeaderUtils; @@ -37,7 +34,6 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; -import org.springframework.web.client.RestTemplate; /** * Sample application that demonstrates how user functions can be triggered by cloud event. @@ -55,6 +51,8 @@ import org.springframework.web.client.RestTemplate; @SpringBootApplication public class CloudeventDemoApplication { + boolean consumerSuccess; + public static void main(String[] args) throws Exception { SpringApplication.run(CloudeventDemoApplication.class, args); } @@ -145,22 +143,8 @@ public class CloudeventDemoApplication { Assert.notEmpty(idHeader, "'id' must not be null"); List specversionHeader = entity.getHeaders().get("ce-specversion"); Assert.notEmpty(specversionHeader, "'specversion' must not be null"); + this.consumerSuccess = true; }; } - - @Bean - @ConditionalOnExpression("'${K_SINK:}'!=''") - public Consumer>> sink(CloudEventAttributesProvider provider, - RestTemplateBuilder builder, @Value("${K_SINK}") String url) { - RestTemplate client = builder.build(); - return eventMessage -> { - RequestEntity> entity = RequestEntity.post(URI.create("http://foo.com")) - .headers(HeaderUtils.fromMessage( - new MessageHeaders(CloudEventMessageUtils.generateAttributes(eventMessage, provider, "io.spring")))) - .body(eventMessage.getPayload()); - client.exchange(entity, byte[].class); - }; - } - } diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java index 288d33885..76c92c2da 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java @@ -171,6 +171,11 @@ public class CloudeventDemoApplicationRESTTests { response = testRestTemplate.exchange(re, String.class); assertThat(response.getBody()).isEqualTo("releaseDate:24-03-2004; releaseName:Spring Framework; version:1.0"); + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)) + .isEqualTo(Collections.singletonList("https://interface21.com/")); + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) + .isEqualTo(Collections.singletonList("com.interface21")); + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID)).isNotNull(); } @Test @@ -202,6 +207,11 @@ public class CloudeventDemoApplicationRESTTests { response = testRestTemplate.exchange(re, String.class); assertThat(response.getBody()).isEqualTo("{\"version\":\"1.0\",\"releaseName\":\"Spring Framework\",\"releaseDate\":\"24-03-2004\"}"); + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)) + .isEqualTo(Collections.singletonList("https://interface21.com/")); + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) + .isEqualTo(Collections.singletonList("com.interface21")); + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID)).isNotNull(); } @Test @@ -271,7 +281,7 @@ public class CloudeventDemoApplicationRESTTests { @Test - public void testAsStructuralPojoToPojo() throws Exception { + public void testAsStructuralPojoToPojoDefaultDataContentType() throws Exception { ApplicationContext context = SpringApplication.run(CloudeventDemoApplication.class); JsonMapper mapper = context.getBean(JsonMapper.class); @@ -280,7 +290,6 @@ public class CloudeventDemoApplicationRESTTests { " \"type\" : \"org.springframework\",\n" + " \"source\" : \"https://spring.io/\",\n" + " \"id\" : \"A234-1234-1234\",\n" + -// " \"ce-datacontenttype\" : \"application/json\",\n" + " \"data\" : {\n" + " \"version\" : \"1.0\",\n" + " \"releaseName\" : \"Spring Framework\",\n" + @@ -288,7 +297,7 @@ public class CloudeventDemoApplicationRESTTests { " }\n" + "}"; - System.out.println(payload); + HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.valueOf("application/cloudevents+json;charset=utf-8")); @@ -308,16 +317,16 @@ public class CloudeventDemoApplicationRESTTests { assertThat(springReleaseEvent.getReleaseName()).isEqualTo("Spring Framework"); assertThat(springReleaseEvent.getVersion()).isEqualTo("10.0"); - -// assertThat(response.getHeaders().get(CloudEventMessageUtils.CANONICAL_SOURCE)) -// .isEqualTo(Collections.singletonList("http://spring.io/application-application")); -// assertThat(response.getHeaders().get(CloudEventMessageUtils.CANONICAL_TYPE)) -// .isEqualTo(Collections.singletonList(SpringReleaseEvent.class.getName())); + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)) + .isEqualTo(Collections.singletonList("https://interface21.com/")); + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) + .isEqualTo(Collections.singletonList("com.interface21")); + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID)).isNotNull(); } @Test public void testPojoConsumer() throws Exception { - SpringApplication.run(new Class[] {CloudeventDemoApplication.class}, new String[] {}); + ApplicationContext context = SpringApplication.run(new Class[] {CloudeventDemoApplication.class}, new String[] {}); HttpHeaders headers = this.buildHeaders(MediaType.APPLICATION_JSON); String payload = "{\"releaseDate\":\"01-10-2006\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}"; @@ -325,6 +334,8 @@ public class CloudeventDemoApplicationRESTTests { RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/pojoConsumer")); ResponseEntity response = testRestTemplate.exchange(re, String.class); assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + CloudeventDemoApplication application = context.getBean(CloudeventDemoApplication.class); + assertThat(application.consumerSuccess).isTrue(); } private URI constructURI(String path) throws Exception {