diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java index 47ef6caf1..1e5952648 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java @@ -178,6 +178,13 @@ public final class CloudEventMessageBuilder { else if (key.startsWith(CloudEventMessageUtils.KAFKA_ATTR_PREFIX)) { this.swapPrefix(key, CloudEventMessageUtils.KAFKA_ATTR_PREFIX, attributePrefixToUse); } + else if (key.equals(CloudEventMessageUtils._ID) || key.equals(CloudEventMessageUtils._SPECVERSION) || + key.equals(CloudEventMessageUtils._SOURCE) || key.equals(CloudEventMessageUtils._TYPE) || + key.equals(CloudEventMessageUtils._DATASCHEMA) || key.equals(CloudEventMessageUtils._SCHEMAURL) || + key.equals(CloudEventMessageUtils._SUBJECT) || key.equals(CloudEventMessageUtils._TIME) || + key.equals(CloudEventMessageUtils._DATACONTENTTYPE)) { + this.swapPrefix(key, "", attributePrefixToUse); + } } } return doBuild(attributePrefixToUse); @@ -197,6 +204,13 @@ public final class CloudEventMessageBuilder { this.headers.put(prefix + CloudEventMessageUtils._ID, UUID.randomUUID().toString()); } this.headers.put(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE); + + if (!this.headers.containsKey(prefix + CloudEventMessageUtils._TYPE)) { + this.headers.put(prefix + CloudEventMessageUtils._TYPE, this.data.getClass().getName()); + } + if (!this.headers.containsKey(prefix + CloudEventMessageUtils._SOURCE)) { + this.headers.put(prefix + CloudEventMessageUtils._SOURCE, URI.create("https://spring.io/" + this.data.getClass().getName())); + } MessageHeaders headers = new MessageHeaders(this.headers); GenericMessage message = new GenericMessage(this.data, headers); Assert.isTrue(CloudEventMessageUtils.isCloudEvent(message), "The message does not appear to be a valid Cloud Event, " diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index 2cd2ab008..21e17558b 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -317,6 +317,10 @@ public final class CloudEventMessageUtils { && message.getHeaders().containsKey(TYPE) && message.getHeaders().containsKey(SOURCE)) || + (message.getHeaders().containsKey(_SPECVERSION) + && message.getHeaders().containsKey(_TYPE) + && message.getHeaders().containsKey(_SOURCE)) + || (message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _SPECVERSION) && message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _TYPE) && message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _SOURCE)) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index 7a66bd1db..9f70696d2 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -620,12 +620,14 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect private Object enrichInvocationResultIfNecessary(Object input, Object result) { if (result != null && !(result instanceof Publisher) && input instanceof Message) { if (result instanceof Message) { - Map headersMap = (Map) ReflectionUtils - .getField(SimpleFunctionRegistry.this.headersField, ((Message) result).getHeaders()); - this.sanitizeHeaders(((Message) input).getHeaders()).forEach((k, v) -> headersMap.putIfAbsent(k, v)); if (functionInvocationHelper != null && CloudEventMessageUtils.isCloudEvent(((Message) input))) { result = functionInvocationHelper.postProcessResult(result, (Message) input); } + else { + Map headersMap = (Map) ReflectionUtils + .getField(SimpleFunctionRegistry.this.headersField, ((Message) result).getHeaders()); + this.sanitizeHeaders(((Message) input).getHeaders()).forEach((k, v) -> headersMap.putIfAbsent(k, v)); + } } else { if (functionInvocationHelper != null && CloudEventMessageUtils.isCloudEvent(((Message) input))) { diff --git a/spring-cloud-function-samples/function-sample-cloudevent/pom.xml b/spring-cloud-function-samples/function-sample-cloudevent/pom.xml index 7d5aef6e5..bb942c769 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/pom.xml +++ b/spring-cloud-function-samples/function-sample-cloudevent/pom.xml @@ -8,7 +8,6 @@ function-sample-cloudevent Demo project for Spring Boot - org.springframework.boot spring-boot-starter-parent @@ -23,7 +22,6 @@ - org.springframework.boot spring-boot-starter-web @@ -31,33 +29,7 @@ org.springframework.cloud spring-cloud-function-web - - - - - - - - - - - - - - - - - - - - - - - - - - org.springframework.boot spring-boot-starter-test diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java deleted file mode 100644 index 995b8c48c..000000000 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Copyright 2020-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.spring.cloudevent; - -import java.net.URI; -import java.util.List; -import java.util.Map; -import java.util.function.Consumer; -import java.util.function.Function; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.web.client.RestTemplateBuilder; -import org.springframework.cloud.function.cloudevent.CloudEventHeaderEnricher; -import org.springframework.cloud.function.cloudevent.CloudEventMessageBuilder; -import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; -import org.springframework.cloud.function.web.util.HeaderUtils; -import org.springframework.context.annotation.Bean; -import org.springframework.http.RequestEntity; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.util.Assert; - -/** - * Sample application that demonstrates how user functions can be triggered by cloud event. - * Events can come from anywhere (e.g., HTTP, Messaging, RSocket etc). - * Given that this particular sample comes already with spring-cloud-function-web support each - * function is a valid REST endpoint where function name signifies URL path (e.g., http://localhost:8080/asPOJOMessage). - * - * Simply start the application and post cloud event to individual function - (see individual 'curl' command at each function). - * - * You can also run CloudeventDemoApplicationTests. - * - * @author Oleg Zhurakousky - * - */ -@SpringBootApplication -public class CloudeventDemoApplication { - - boolean consumerSuccess; - - public static void main(String[] args) throws Exception { - SpringApplication.run(CloudeventDemoApplication.class, args); - } - - @Bean - public Function, String> asStringMessage() { - return v -> { - System.out.println("Received Cloud Event with raw data: " + v); - return v.getPayload(); - }; - } - - - @Bean - public Function asString() { - return v -> { - System.out.println("Received raw Cloud Event data: " + v); - return v; - }; - } - - - @Bean - public Function, String> asPOJOMessage() { - return v -> { - System.out.println("Received Cloud Event with POJO data: " + v); - return v.getPayload().toString(); - }; - } - - - @Bean - public Function asPOJO() { - return v -> { - System.out.println("Received POJO Cloud Event data: " + v); - return v.toString(); - }; - } - - @Bean - public Function, Message> consumeAndProduceCloudEvent() { - return ceMessage -> { - SpringReleaseEvent data = ceMessage.getPayload(); - data.setVersion("2.0"); - data.setReleaseDateAsString("01-10-2006"); - - return MessageBuilder.withPayload(data).build(); - }; - } - - @Bean - public CloudEventHeaderEnricher cloudEventHeaderEnricher() { - return headers -> { - return headers.setSource("https://interface21.com/").setType("com.interface21"); - }; - } - - - @Bean - public Function, Map> consumeAndProduceCloudEventAsMapToMap() { - return ceMessage -> { - ceMessage.put("version", "10.0"); - ceMessage.put("releaseDate", "01-10-2050"); - return ceMessage; - }; - } - - @Bean - public Function consumeAndProduceCloudEventAsPojoToPojo() { - return event -> { - event.setVersion("2.0"); - return event; - }; - } - - @Bean - public Consumer> pojoConsumer(CloudEventHeaderEnricher enricher, RestTemplateBuilder builder) { - return eventMessage -> { - Message newMessage = enricher.enrich(CloudEventMessageBuilder.fromMessage(eventMessage)).build(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX); - RequestEntity entity = RequestEntity.post(URI.create("http://foo.com")) - .headers(HeaderUtils.fromMessage(newMessage.getHeaders())) - .body(eventMessage.getPayload()); - List sourceHeader = entity.getHeaders().get("ce-source"); - Assert.isTrue(sourceHeader.get(0).equals("https://interface21.com/"), "'source' must be https://interface21.com/"); - List typeHeader = entity.getHeaders().get("ce-type"); - Assert.isTrue(typeHeader.get(0).equals("com.interface21"), "'source' must be com.interface21"); - List idHeader = entity.getHeaders().get("ce-id"); - Assert.notEmpty(idHeader, "'id' must not be null"); - List specversionHeader = entity.getHeaders().get("ce-specversion"); - Assert.notEmpty(specversionHeader, "'specversion' must not be null"); - this.consumerSuccess = true; - }; - } - -} diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/SpringReleaseEvent.java b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/SpringReleaseEvent.java deleted file mode 100644 index 2bf869554..000000000 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/SpringReleaseEvent.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2020-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.spring.cloudevent; - -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; - -import com.fasterxml.jackson.annotation.JsonFormat; - -/** - * An example POJO that represents cloud event data - * - * @author Oleg Zhurakousky - * - */ -public class SpringReleaseEvent { - - @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy") - private Date releaseDate; - - private String releaseName; - - private String version; - - public Date getReleaseDate() { - return releaseDate; - } - - public void setReleaseDate(Date releaseDate) { - this.releaseDate = releaseDate; - } - - public void setReleaseDateAsString(String releaseDate) { - try { - this.releaseDate = new SimpleDateFormat("dd-MM-yyyy").parse(releaseDate); - } - catch (ParseException e) { - throw new IllegalArgumentException(e); - } - } - - public String getReleaseName() { - return releaseName; - } - - public void setReleaseName(String releaseName) { - this.releaseName = releaseName; - } - - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - @Override - public String toString() { - return "releaseDate:" + new SimpleDateFormat("dd-MM-yyyy").format(releaseDate) + "; releaseName:" + releaseName + "; version:" + version; - } -} diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationFunctionTests.java b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationFunctionTests.java deleted file mode 100644 index e36788d10..000000000 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationFunctionTests.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright 2020-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.spring.cloudevent; - -import java.util.function.Function; - -import org.junit.jupiter.api.Test; -import org.springframework.boot.WebApplicationType; -import org.springframework.boot.builder.SpringApplicationBuilder; -import org.springframework.cloud.function.cloudevent.CloudEventMessageBuilder; -import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; -import org.springframework.cloud.function.context.FunctionCatalog; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; - -/** - * - * @author Oleg Zhurakousky - * - */ -public class CloudeventDemoApplicationFunctionTests { - - @Test - public void demoPureFunctionInvocation() { - - try(ConfigurableApplicationContext context = new SpringApplicationBuilder(CloudeventDemoApplication.class) - .web(WebApplicationType.NONE).run()) { - FunctionCatalog catalog = context.getBean(FunctionCatalog.class); - - Message inputMessage = CloudEventMessageBuilder - .withData("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}") - .setSource("https://spring.io/spring-event") - .setType("com.example.springevent") - .build(); - - /* - * NOTE how it makes no difference what the actual function signature - * is (see `asPOJOMessage` and `asPOJO` specifically). Type conversion will happen - * inside spring-cloud-function. - */ - Function, Message> asPojoMessage = catalog.lookup("asPOJOMessage"); - System.out.println(asPojoMessage.apply(inputMessage)); - - Function, Message> asPojo = catalog.lookup("asPOJO"); - System.out.println(asPojo.apply(inputMessage)); - - Function, Message> asString = catalog.lookup("asString"); - System.out.println(asString.apply(inputMessage)); - - Function, Message> asStringMessage = catalog.lookup("asStringMessage"); - System.out.println(asStringMessage.apply(inputMessage)); - } - } - - @Test - public void demoPureFunctionProduceConsumeCloudEvent() { - try(ConfigurableApplicationContext context = new SpringApplicationBuilder(CloudeventDemoApplication.class) - .web(WebApplicationType.NONE).run()) { - FunctionCatalog catalog = context.getBean(FunctionCatalog.class); - - Message inputMessage = CloudEventMessageBuilder - .withData("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}") - .setSource("https://spring.io/spring-event") - .setType("com.example.springevent") - .build(); - - /* - * NOTE how it makes no difference what the actual function signature - * is (see `asPOJOMessage` and `asPOJO` specifically). Type conversion will happen - * inside spring-cloud-function. - */ - Function, Message> asPojoMessage = catalog.lookup("consumeAndProduceCloudEvent"); - System.out.println(asPojoMessage.apply(inputMessage)); - } - } - - @Test - public void demoPureFunctionProduceConsumeCloudEventAsPojo() { - try(ConfigurableApplicationContext context = new SpringApplicationBuilder(CloudeventDemoApplication.class) - .web(WebApplicationType.NONE).run()) { - FunctionCatalog catalog = context.getBean(FunctionCatalog.class); - - Message inputMessage = CloudEventMessageBuilder - .withData("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}") - .setSource("https://spring.io/spring-event") - .setType("com.example.springevent") - .build(); - - /* - * NOTE how it makes no difference what the actual function signature - * is (see `asPOJOMessage` and `asPOJO` specifically). Type conversion will happen - * inside spring-cloud-function. - */ - Function, Message> asPojoMessage = catalog.lookup("consumeAndProduceCloudEventAsPojoToPojo"); - System.out.println(asPojoMessage.apply(inputMessage)); - } - } -} diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java deleted file mode 100644 index db6aeb308..000000000 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java +++ /dev/null @@ -1,421 +0,0 @@ -/* - * Copyright 2020-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.spring.cloudevent; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.net.URI; -import java.text.SimpleDateFormat; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.UUID; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.test.web.client.TestRestTemplate; -import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; -import org.springframework.cloud.function.json.JsonMapper; -import org.springframework.context.ApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; -import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; -import org.springframework.http.RequestEntity; -import org.springframework.http.ResponseEntity; -import org.springframework.lang.Nullable; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.converter.AbstractMessageConverter; -import org.springframework.messaging.converter.MessageConverter; -import org.springframework.util.MimeType; -import org.springframework.util.SocketUtils; - -/** - * - * @author Oleg Zhurakousky - * - */ -public class CloudeventDemoApplicationRESTTests { - - private TestRestTemplate testRestTemplate = new TestRestTemplate(); - - @BeforeEach - public void init() throws Exception { - System.setProperty("server.port", String.valueOf(SocketUtils.findAvailableTcpPort())); - } - - /* - * This test demonstrates consumption of Cloud Event via HTTP POST - binary-mode message. - * According to specification - https://github.com/cloudevents/spec/blob/v1.0/spec.md - * - A "binary-mode message" is one where the event data is stored in the message body, - * and event attributes are stored as part of message meta-data. - * - * The above means that it fits perfectly with Spring Message model and as such there is - * absolutely nothing that needs to be done at the framework or user level to consume it. - * It just works! - * - * The example demonstrated via two types of functions - * - Function, String> asBinaryViaMessage; - * - Function asJustBinary; - */ - @Test - public void testAsBinaryMessageViaHTTP() throws Exception { - SpringApplication.run(CloudeventDemoApplication.class); - HttpHeaders headers = this.buildHeaders(MediaType.APPLICATION_JSON); - // will work with either content type -// HttpHeaders headers = this.buildHeaders(MediaType.valueOf("application/cloudevents+json;charset=utf-8")); - - String payload = "{\"releaseDate\":\"2004-03-24\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}"; - - RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asStringMessage")); - ResponseEntity response = testRestTemplate.exchange(re, String.class); - - assertThat(response.getBody()).isEqualTo(payload); - - re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asString")); - response = testRestTemplate.exchange(re, String.class); - - assertThat(response.getBody()).isEqualTo(payload); - } - - /* - * The same as the previous two tests with the exception that cloud event data de-serialized into POJO. - * Again, given that abstractions for transparent type conversion already part of the Spring ecosystem nothing - * needed to be done at the framework or user level to consume it. - * It just works! - */ - @Test - public void testAsBinaryPOJOMessageViaHTTP() throws Exception { - SpringApplication.run(CloudeventDemoApplication.class); - - HttpHeaders headers = this.buildHeaders(MediaType.APPLICATION_JSON); - String payload = "{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}"; - - RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asPOJOMessage")); - ResponseEntity response = testRestTemplate.exchange(re, String.class); - - assertThat(response.getBody()).isEqualTo("releaseDate:24-03-2004; releaseName:Spring Framework; version:1.0"); - - re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asPOJO")); - response = testRestTemplate.exchange(re, String.class); - - assertThat(response.getBody()).isEqualTo("releaseDate:24-03-2004; releaseName:Spring Framework; version:1.0"); - } - - /* - * This test demonstrates parsing of cloud event out of provided 'datacontenttype' - * using custom message converter which supports imaginary "contentType=foo/bar". - * - */ - @Test - public void testAsBinaryPOJOMessageViaHTTPCustomDataType() throws Exception { - SpringApplication.run(new Class[] {CloudeventDemoApplication.class, FooBarConverterConfiguration.class}, new String[] {}); - - HttpHeaders headers = this.buildHeaders(MediaType.valueOf("application/cloudevents+json;charset=utf-8")); - headers.set(CloudEventMessageUtils.DATACONTENTTYPE, "foo/bar"); - String payload = "24-03-2004:Spring Framework:1.0"; - - RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asPOJOMessage")); - ResponseEntity response = testRestTemplate.exchange(re, String.class); - - assertThat(response.getBody()).isEqualTo("releaseDate:24-03-2004; releaseName:Spring Framework; version:1.0"); - } - - /* - * This test demonstrates sending structured - */ - @Test - public void testAsStracturalFormatToPOJO() throws Exception { - SpringApplication.run(CloudeventDemoApplication.class); - - String payload = "{\n" + - " \"specversion\" : \"1.0\",\n" + - " \"type\" : \"org.springframework\",\n" + - " \"source\" : \"https://spring.io/\",\n" + - " \"id\" : \"A234-1234-1234\",\n" + - " \"datacontenttype\" : \"application/json\",\n" + - " \"data\" : {\n" + - " \"version\" : \"1.0\",\n" + - " \"releaseName\" : \"Spring Framework\",\n" + - " \"releaseDate\" : \"24-03-2004\"\n" + - " }\n" + - "}"; - System.out.println(payload); - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.valueOf("application/cloudevents+json;charset=utf-8")); - - RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asPOJOMessage")); - ResponseEntity response = testRestTemplate.exchange(re, String.class); - - assertThat(response.getBody()).isEqualTo("releaseDate:24-03-2004; releaseName:Spring Framework; version:1.0"); - - re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asPOJO")); - response = testRestTemplate.exchange(re, String.class); - - assertThat(response.getBody()).isEqualTo("releaseDate:24-03-2004; releaseName:Spring Framework; version:1.0"); - assertThat(response.getHeaders().get(CloudEventMessageUtils.SOURCE)) - .isEqualTo(Collections.singletonList("https://interface21.com/")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.TYPE)) - .isEqualTo(Collections.singletonList("com.interface21")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.ID)).isNotNull(); - } - - @Test - public void testAsStracturalFormatToString() throws Exception { - SpringApplication.run(CloudeventDemoApplication.class); - - String payload = "{\n" + - " \"ce-specversion\" : \"1.0\",\n" + - " \"ce-type\" : \"org.springframework\",\n" + - " \"ce-source\" : \"https://spring.io/\",\n" + - " \"ce-id\" : \"A234-1234-1234\",\n" + - " \"ce-datacontenttype\" : \"application/json\",\n" + - " \"ce-data\" : {\n" + - " \"version\" : \"1.0\",\n" + - " \"releaseName\" : \"Spring Framework\",\n" + - " \"releaseDate\" : \"24-03-2004\"\n" + - " }\n" + - "}"; - - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.valueOf("application/cloudevents+json;charset=utf-8")); - - RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asStringMessage")); - ResponseEntity response = testRestTemplate.exchange(re, String.class); - - assertThat(response.getBody()).isEqualTo("{\"version\":\"1.0\",\"releaseName\":\"Spring Framework\",\"releaseDate\":\"24-03-2004\"}"); - - re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asString")); - response = testRestTemplate.exchange(re, String.class); - - assertThat(response.getBody()).isEqualTo("{\"version\":\"1.0\",\"releaseName\":\"Spring Framework\",\"releaseDate\":\"24-03-2004\"}"); - assertThat(response.getHeaders().get(CloudEventMessageUtils.SOURCE)) - .isEqualTo(Collections.singletonList("https://interface21.com/")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.TYPE)) - .isEqualTo(Collections.singletonList("com.interface21")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.ID)).isNotNull(); - } - - @Test - public void testAsBinaryMapToMap() throws Exception { - SpringApplication.run(new Class[] {CloudeventDemoApplication.class}, new String[] {}); - - HttpHeaders headers = this.buildHeaders(MediaType.APPLICATION_JSON); - String payload = "{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}"; - - RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/consumeAndProduceCloudEventAsMapToMap")); - ResponseEntity response = testRestTemplate.exchange(re, String.class); - - assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2050\",\"releaseName\":\"Spring Framework\",\"version\":\"10.0\"}"); - assertThat(response.getHeaders().get(CloudEventMessageUtils.SOURCE)) - .isEqualTo(Collections.singletonList("https://interface21.com/")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.TYPE)) - .isEqualTo(Collections.singletonList("com.interface21")); - } - - @Test - public void testAsBinaryPojoToPojo() throws Exception { - SpringApplication.run(new Class[] {CloudeventDemoApplication.class}, new String[] {}); - - HttpHeaders headers = this.buildHeaders(MediaType.APPLICATION_JSON); - String payload = "{\"releaseDate\":\"01-10-2006\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}"; - - RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/consumeAndProduceCloudEventAsPojoToPojo")); - ResponseEntity response = testRestTemplate.exchange(re, String.class); - - assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2006\",\"releaseName\":\"Spring Framework\",\"version\":\"2.0\"}"); - assertThat(response.getHeaders().get(CloudEventMessageUtils.SOURCE)) - .isEqualTo(Collections.singletonList("https://interface21.com/")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.TYPE)) - .isEqualTo(Collections.singletonList("com.interface21")); - } - - - /* - * Typically this would never happen since spec mandates that HTTP uses 'ce-` prefix. - * So this is to primarily validate that we can recognize it process it and still produce correct headers - */ - @Test - public void testAsBinaryPojoToPojoWrongHeaders() throws Exception { - SpringApplication.run(new Class[] {CloudeventDemoApplication.class}, new String[] {}); - - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.APPLICATION_JSON); - headers.set(CloudEventMessageUtils.ID, UUID.randomUUID().toString()); - headers.set(CloudEventMessageUtils.SOURCE, "https://spring.io/"); - headers.set(CloudEventMessageUtils.SPECVERSION, "1.0"); - headers.set(CloudEventMessageUtils.TYPE, "org.springframework"); - String payload = "{\"releaseDate\":\"01-10-2006\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}"; - - RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/consumeAndProduceCloudEventAsPojoToPojo")); - ResponseEntity response = testRestTemplate.exchange(re, String.class); - - assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2006\",\"releaseName\":\"Spring Framework\",\"version\":\"2.0\"}"); - assertThat(response.getHeaders().get(CloudEventMessageUtils.SOURCE)) - .isEqualTo(Collections.singletonList("https://interface21.com/")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.TYPE)) - .isEqualTo(Collections.singletonList("com.interface21")); - } - - - @Test - public void testAsStructuralPojoToPojoDefaultDataContentType() throws Exception { - ApplicationContext context = SpringApplication.run(CloudeventDemoApplication.class); - JsonMapper mapper = context.getBean(JsonMapper.class); - - String payload = "{\n" + - " \"specversion\" : \"1.0\",\n" + - " \"type\" : \"org.springframework\",\n" + - " \"source\" : \"https://spring.io/\",\n" + - " \"id\" : \"A234-1234-1234\",\n" + - " \"data\" : {\n" + - " \"version\" : \"1.0\",\n" + - " \"releaseName\" : \"Spring Framework\",\n" + - " \"releaseDate\" : \"24-03-2004\"\n" + - " }\n" + - "}"; - - - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.valueOf("application/cloudevents+json;charset=utf-8")); - - RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/consumeAndProduceCloudEventAsPojoToPojo")); - ResponseEntity response = testRestTemplate.exchange(re, String.class); - - SpringReleaseEvent springReleaseEvent = mapper.fromJson(response.getBody(), SpringReleaseEvent.class); - - assertThat(springReleaseEvent.getReleaseName()).isEqualTo("Spring Framework"); - assertThat(springReleaseEvent.getVersion()).isEqualTo("2.0"); - - re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/consumeAndProduceCloudEventAsMapToMap")); - response = testRestTemplate.exchange(re, String.class); - - springReleaseEvent = mapper.fromJson(response.getBody(), SpringReleaseEvent.class); - - assertThat(springReleaseEvent.getReleaseName()).isEqualTo("Spring Framework"); - assertThat(springReleaseEvent.getVersion()).isEqualTo("10.0"); - - assertThat(response.getHeaders().get(CloudEventMessageUtils.SOURCE)) - .isEqualTo(Collections.singletonList("https://interface21.com/")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.TYPE)) - .isEqualTo(Collections.singletonList("com.interface21")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.ID)).isNotNull(); - } - - @Test - public void testPojoConsumer() throws Exception { - ApplicationContext context = SpringApplication.run(new Class[] {CloudeventDemoApplication.class}, new String[] {}); - - HttpHeaders headers = this.buildHeaders(MediaType.APPLICATION_JSON); - String payload = "{\"releaseDate\":\"01-10-2006\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}"; - - RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/pojoConsumer")); - ResponseEntity response = testRestTemplate.exchange(re, String.class); - assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); - CloudeventDemoApplication application = context.getBean(CloudeventDemoApplication.class); - assertThat(application.consumerSuccess).isTrue(); - } - - private URI constructURI(String path) throws Exception { - return new URI("http://localhost:" + System.getProperty("server.port") + path); - } - - private HttpHeaders buildHeaders(MediaType contentType) { - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(contentType); - headers.set(CloudEventMessageUtils.ID, UUID.randomUUID().toString()); - headers.set(CloudEventMessageUtils.SOURCE, "https://spring.io/"); - headers.set(CloudEventMessageUtils.SPECVERSION, "1.0"); - headers.set(CloudEventMessageUtils.TYPE, "org.springframework"); - return headers; - } - - @Configuration - public static class FooBarConverterConfiguration { - @Bean - public MessageConverter foobar(JsonMapper jsonMapper) { - return new FooBarToCloudEventMessageConverter(jsonMapper); - } - } - - public static class FooBarToCloudEventMessageConverter extends AbstractMessageConverter { - - public FooBarToCloudEventMessageConverter(JsonMapper jsonMapper) { - super(new MimeType("foo", "bar")); - } - - @Override - protected boolean supports(Class clazz) { - throw new UnsupportedOperationException(); - } - - @Override - protected boolean canConvertTo(Object payload, @Nullable MessageHeaders headers) { - if (!supportsMimeType(headers)) { - return false; - } - return true; - } - @Override - protected boolean canConvertFrom(Message message, @Nullable Class targetClass) { - if (targetClass == null || !supportsMimeType(message.getHeaders())) { - return false; - } - else if (message.getHeaders().containsKey(CloudEventMessageUtils.DATACONTENTTYPE) - && message.getHeaders().get(CloudEventMessageUtils.DATACONTENTTYPE).equals("foo/bar")) { - return true; - } - return false; - } - - @Override - protected Object convertFromInternal(Message message, Class targetClass, @Nullable Object conversionHint) { - if (message.getHeaders().containsKey(CloudEventMessageUtils.DATACONTENTTYPE) - && message.getHeaders().get(CloudEventMessageUtils.DATACONTENTTYPE).equals("foo/bar") - && SpringReleaseEvent.class == targetClass) { - SpringReleaseEvent event = new SpringReleaseEvent(); - String[] data = ((String) message.getPayload()).split(":"); - SimpleDateFormat df = new SimpleDateFormat("dd-MM-yyyy"); - try { - event.setReleaseDate(df.parse(data[0].trim())); - } - catch (Exception e) { - throw new IllegalArgumentException("Failed to convert date", e); - } - event.setReleaseName(data[1]); - event.setVersion(data[2]); - return event; - } - else { - return super.convertFromInternal(message, targetClass, conversionHint); - } - } - - @Override - protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers, - @Nullable Object conversionHint) { - - return null; - - } - } - -}