Modify logic for header copy from input to output

This is primarily related to Cloud Events. Since we delegate to a separate class for post processing, if outpt message is Cloud Event we will not be doing anything to with regard to header copy in SimpleFunctionRegistry and unstead delegate it to CloudEventFunctionInvocationHelper
This commit is contained in:
Oleg Zhurakousky
2020-12-17 12:18:07 +01:00
parent a86e8bd0f0
commit 17d5d4b727
8 changed files with 23 additions and 792 deletions

View File

@@ -8,7 +8,6 @@
<name>function-sample-cloudevent</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
@@ -23,7 +22,6 @@
</properties>
<dependencies>
<!-- REST - only needed if you intend to invoke via HTTP -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
@@ -31,33 +29,7 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-web</artifactId>
<!-- <version>3.1.0-SNAPSHOT</version> -->
</dependency>
<!-- end REST -->
<!-- RSocket - only needed if you intend to invoke via RSocket -->
<!-- <dependency> -->
<!-- <groupId>org.springframework.cloud</groupId> -->
<!-- <artifactId>spring-cloud-function-rsocket</artifactId> -->
<!-- <version>3.1.0-SNAPSHOT</version> -->
<!-- </dependency> -->
<!-- end RSocket -->
<!-- RabbitMQ - only needed if you intend to invoke via RabbitMQ -->
<!-- <dependency> -->
<!-- <groupId>org.springframework.cloud</groupId> -->
<!-- <artifactId>spring-cloud-stream-binder-rabbit</artifactId> -->
<!-- <version>3.1.0-SNAPSHOT</version> -->
<!-- </dependency> -->
<!-- end RabbitMQ -->
<!-- Kafka - only needed if you intend to invoke via RabbitMQ -->
<!-- <dependency> -->
<!-- <groupId>org.springframework.cloud</groupId> -->
<!-- <artifactId>spring-cloud-stream-binder-kafka</artifactId> -->
<!-- <version>3.1.0-SNAPSHOT</version> -->
<!-- </dependency> -->
<!-- end Kafka -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>

View File

@@ -1,150 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.spring.cloudevent;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.cloud.function.cloudevent.CloudEventHeaderEnricher;
import org.springframework.cloud.function.cloudevent.CloudEventMessageBuilder;
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.cloud.function.web.util.HeaderUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.http.RequestEntity;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
/**
* Sample application that demonstrates how user functions can be triggered by cloud event.
* Events can come from anywhere (e.g., HTTP, Messaging, RSocket etc).
* Given that this particular sample comes already with spring-cloud-function-web support each
* function is a valid REST endpoint where function name signifies URL path (e.g., http://localhost:8080/asPOJOMessage).
*
* Simply start the application and post cloud event to individual function - (see individual 'curl' command at each function).
*
* You can also run CloudeventDemoApplicationTests.
*
* @author Oleg Zhurakousky
*
*/
@SpringBootApplication
public class CloudeventDemoApplication {
boolean consumerSuccess;
public static void main(String[] args) throws Exception {
SpringApplication.run(CloudeventDemoApplication.class, args);
}
@Bean
public Function<Message<String>, String> asStringMessage() {
return v -> {
System.out.println("Received Cloud Event with raw data: " + v);
return v.getPayload();
};
}
@Bean
public Function<String, String> asString() {
return v -> {
System.out.println("Received raw Cloud Event data: " + v);
return v;
};
}
@Bean
public Function<Message<SpringReleaseEvent>, String> asPOJOMessage() {
return v -> {
System.out.println("Received Cloud Event with POJO data: " + v);
return v.getPayload().toString();
};
}
@Bean
public Function<SpringReleaseEvent, String> asPOJO() {
return v -> {
System.out.println("Received POJO Cloud Event data: " + v);
return v.toString();
};
}
@Bean
public Function<Message<SpringReleaseEvent>, Message<SpringReleaseEvent>> consumeAndProduceCloudEvent() {
return ceMessage -> {
SpringReleaseEvent data = ceMessage.getPayload();
data.setVersion("2.0");
data.setReleaseDateAsString("01-10-2006");
return MessageBuilder.withPayload(data).build();
};
}
@Bean
public CloudEventHeaderEnricher cloudEventHeaderEnricher() {
return headers -> {
return headers.setSource("https://interface21.com/").setType("com.interface21");
};
}
@Bean
public Function<Map<String, Object>, Map<String, Object>> consumeAndProduceCloudEventAsMapToMap() {
return ceMessage -> {
ceMessage.put("version", "10.0");
ceMessage.put("releaseDate", "01-10-2050");
return ceMessage;
};
}
@Bean
public Function<SpringReleaseEvent, SpringReleaseEvent> consumeAndProduceCloudEventAsPojoToPojo() {
return event -> {
event.setVersion("2.0");
return event;
};
}
@Bean
public Consumer<Message<SpringReleaseEvent>> pojoConsumer(CloudEventHeaderEnricher enricher, RestTemplateBuilder builder) {
return eventMessage -> {
Message<?> newMessage = enricher.enrich(CloudEventMessageBuilder.fromMessage(eventMessage)).build(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX);
RequestEntity<SpringReleaseEvent> entity = RequestEntity.post(URI.create("http://foo.com"))
.headers(HeaderUtils.fromMessage(newMessage.getHeaders()))
.body(eventMessage.getPayload());
List<String> sourceHeader = entity.getHeaders().get("ce-source");
Assert.isTrue(sourceHeader.get(0).equals("https://interface21.com/"), "'source' must be https://interface21.com/");
List<String> typeHeader = entity.getHeaders().get("ce-type");
Assert.isTrue(typeHeader.get(0).equals("com.interface21"), "'source' must be com.interface21");
List<String> idHeader = entity.getHeaders().get("ce-id");
Assert.notEmpty(idHeader, "'id' must not be null");
List<String> specversionHeader = entity.getHeaders().get("ce-specversion");
Assert.notEmpty(specversionHeader, "'specversion' must not be null");
this.consumerSuccess = true;
};
}
}

View File

@@ -1,77 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.spring.cloudevent;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat;
/**
* An example POJO that represents cloud event data
*
* @author Oleg Zhurakousky
*
*/
public class SpringReleaseEvent {
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy")
private Date releaseDate;
private String releaseName;
private String version;
public Date getReleaseDate() {
return releaseDate;
}
public void setReleaseDate(Date releaseDate) {
this.releaseDate = releaseDate;
}
public void setReleaseDateAsString(String releaseDate) {
try {
this.releaseDate = new SimpleDateFormat("dd-MM-yyyy").parse(releaseDate);
}
catch (ParseException e) {
throw new IllegalArgumentException(e);
}
}
public String getReleaseName() {
return releaseName;
}
public void setReleaseName(String releaseName) {
this.releaseName = releaseName;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
@Override
public String toString() {
return "releaseDate:" + new SimpleDateFormat("dd-MM-yyyy").format(releaseDate) + "; releaseName:" + releaseName + "; version:" + version;
}
}

View File

@@ -1,113 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.spring.cloudevent;
import java.util.function.Function;
import org.junit.jupiter.api.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.function.cloudevent.CloudEventMessageBuilder;
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
/**
*
* @author Oleg Zhurakousky
*
*/
public class CloudeventDemoApplicationFunctionTests {
@Test
public void demoPureFunctionInvocation() {
try(ConfigurableApplicationContext context = new SpringApplicationBuilder(CloudeventDemoApplication.class)
.web(WebApplicationType.NONE).run()) {
FunctionCatalog catalog = context.getBean(FunctionCatalog.class);
Message<String> inputMessage = CloudEventMessageBuilder
.withData("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}")
.setSource("https://spring.io/spring-event")
.setType("com.example.springevent")
.build();
/*
* NOTE how it makes no difference what the actual function signature
* is (see `asPOJOMessage` and `asPOJO` specifically). Type conversion will happen
* inside spring-cloud-function.
*/
Function<Message<String>, Message<String>> asPojoMessage = catalog.lookup("asPOJOMessage");
System.out.println(asPojoMessage.apply(inputMessage));
Function<Message<String>, Message<String>> asPojo = catalog.lookup("asPOJO");
System.out.println(asPojo.apply(inputMessage));
Function<Message<String>, Message<String>> asString = catalog.lookup("asString");
System.out.println(asString.apply(inputMessage));
Function<Message<String>, Message<String>> asStringMessage = catalog.lookup("asStringMessage");
System.out.println(asStringMessage.apply(inputMessage));
}
}
@Test
public void demoPureFunctionProduceConsumeCloudEvent() {
try(ConfigurableApplicationContext context = new SpringApplicationBuilder(CloudeventDemoApplication.class)
.web(WebApplicationType.NONE).run()) {
FunctionCatalog catalog = context.getBean(FunctionCatalog.class);
Message<String> inputMessage = CloudEventMessageBuilder
.withData("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}")
.setSource("https://spring.io/spring-event")
.setType("com.example.springevent")
.build();
/*
* NOTE how it makes no difference what the actual function signature
* is (see `asPOJOMessage` and `asPOJO` specifically). Type conversion will happen
* inside spring-cloud-function.
*/
Function<Message<String>, Message<String>> asPojoMessage = catalog.lookup("consumeAndProduceCloudEvent");
System.out.println(asPojoMessage.apply(inputMessage));
}
}
@Test
public void demoPureFunctionProduceConsumeCloudEventAsPojo() {
try(ConfigurableApplicationContext context = new SpringApplicationBuilder(CloudeventDemoApplication.class)
.web(WebApplicationType.NONE).run()) {
FunctionCatalog catalog = context.getBean(FunctionCatalog.class);
Message<String> inputMessage = CloudEventMessageBuilder
.withData("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}")
.setSource("https://spring.io/spring-event")
.setType("com.example.springevent")
.build();
/*
* NOTE how it makes no difference what the actual function signature
* is (see `asPOJOMessage` and `asPOJO` specifically). Type conversion will happen
* inside spring-cloud-function.
*/
Function<Message<String>, Message<String>> asPojoMessage = catalog.lookup("consumeAndProduceCloudEventAsPojoToPojo");
System.out.println(asPojoMessage.apply(inputMessage));
}
}
}

View File

@@ -1,421 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.spring.cloudevent;
import static org.assertj.core.api.Assertions.assertThat;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.AbstractMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.MimeType;
import org.springframework.util.SocketUtils;
/**
*
* @author Oleg Zhurakousky
*
*/
public class CloudeventDemoApplicationRESTTests {
private TestRestTemplate testRestTemplate = new TestRestTemplate();
@BeforeEach
public void init() throws Exception {
System.setProperty("server.port", String.valueOf(SocketUtils.findAvailableTcpPort()));
}
/*
* This test demonstrates consumption of Cloud Event via HTTP POST - binary-mode message.
* According to specification - https://github.com/cloudevents/spec/blob/v1.0/spec.md
* - A "binary-mode message" is one where the event data is stored in the message body,
* and event attributes are stored as part of message meta-data.
*
* The above means that it fits perfectly with Spring Message model and as such there is
* absolutely nothing that needs to be done at the framework or user level to consume it.
* It just works!
*
* The example demonstrated via two types of functions
* - Function<Message<String>, String> asBinaryViaMessage;
* - Function<String, String> asJustBinary;
*/
@Test
public void testAsBinaryMessageViaHTTP() throws Exception {
SpringApplication.run(CloudeventDemoApplication.class);
HttpHeaders headers = this.buildHeaders(MediaType.APPLICATION_JSON);
// will work with either content type
// HttpHeaders headers = this.buildHeaders(MediaType.valueOf("application/cloudevents+json;charset=utf-8"));
String payload = "{\"releaseDate\":\"2004-03-24\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}";
RequestEntity<String> re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asStringMessage"));
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);
assertThat(response.getBody()).isEqualTo(payload);
re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asString"));
response = testRestTemplate.exchange(re, String.class);
assertThat(response.getBody()).isEqualTo(payload);
}
/*
* The same as the previous two tests with the exception that cloud event data de-serialized into POJO.
* Again, given that abstractions for transparent type conversion already part of the Spring ecosystem nothing
* needed to be done at the framework or user level to consume it.
* It just works!
*/
@Test
public void testAsBinaryPOJOMessageViaHTTP() throws Exception {
SpringApplication.run(CloudeventDemoApplication.class);
HttpHeaders headers = this.buildHeaders(MediaType.APPLICATION_JSON);
String payload = "{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}";
RequestEntity<String> re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asPOJOMessage"));
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);
assertThat(response.getBody()).isEqualTo("releaseDate:24-03-2004; releaseName:Spring Framework; version:1.0");
re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asPOJO"));
response = testRestTemplate.exchange(re, String.class);
assertThat(response.getBody()).isEqualTo("releaseDate:24-03-2004; releaseName:Spring Framework; version:1.0");
}
/*
* This test demonstrates parsing of cloud event out of provided 'datacontenttype'
* using custom message converter which supports imaginary "contentType=foo/bar".
*
*/
@Test
public void testAsBinaryPOJOMessageViaHTTPCustomDataType() throws Exception {
SpringApplication.run(new Class[] {CloudeventDemoApplication.class, FooBarConverterConfiguration.class}, new String[] {});
HttpHeaders headers = this.buildHeaders(MediaType.valueOf("application/cloudevents+json;charset=utf-8"));
headers.set(CloudEventMessageUtils.DATACONTENTTYPE, "foo/bar");
String payload = "24-03-2004:Spring Framework:1.0";
RequestEntity<String> re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asPOJOMessage"));
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);
assertThat(response.getBody()).isEqualTo("releaseDate:24-03-2004; releaseName:Spring Framework; version:1.0");
}
/*
* This test demonstrates sending structured
*/
@Test
public void testAsStracturalFormatToPOJO() throws Exception {
SpringApplication.run(CloudeventDemoApplication.class);
String payload = "{\n" +
" \"specversion\" : \"1.0\",\n" +
" \"type\" : \"org.springframework\",\n" +
" \"source\" : \"https://spring.io/\",\n" +
" \"id\" : \"A234-1234-1234\",\n" +
" \"datacontenttype\" : \"application/json\",\n" +
" \"data\" : {\n" +
" \"version\" : \"1.0\",\n" +
" \"releaseName\" : \"Spring Framework\",\n" +
" \"releaseDate\" : \"24-03-2004\"\n" +
" }\n" +
"}";
System.out.println(payload);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.valueOf("application/cloudevents+json;charset=utf-8"));
RequestEntity<String> re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asPOJOMessage"));
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);
assertThat(response.getBody()).isEqualTo("releaseDate:24-03-2004; releaseName:Spring Framework; version:1.0");
re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asPOJO"));
response = testRestTemplate.exchange(re, String.class);
assertThat(response.getBody()).isEqualTo("releaseDate:24-03-2004; releaseName:Spring Framework; version:1.0");
assertThat(response.getHeaders().get(CloudEventMessageUtils.SOURCE))
.isEqualTo(Collections.singletonList("https://interface21.com/"));
assertThat(response.getHeaders().get(CloudEventMessageUtils.TYPE))
.isEqualTo(Collections.singletonList("com.interface21"));
assertThat(response.getHeaders().get(CloudEventMessageUtils.ID)).isNotNull();
}
@Test
public void testAsStracturalFormatToString() throws Exception {
SpringApplication.run(CloudeventDemoApplication.class);
String payload = "{\n" +
" \"ce-specversion\" : \"1.0\",\n" +
" \"ce-type\" : \"org.springframework\",\n" +
" \"ce-source\" : \"https://spring.io/\",\n" +
" \"ce-id\" : \"A234-1234-1234\",\n" +
" \"ce-datacontenttype\" : \"application/json\",\n" +
" \"ce-data\" : {\n" +
" \"version\" : \"1.0\",\n" +
" \"releaseName\" : \"Spring Framework\",\n" +
" \"releaseDate\" : \"24-03-2004\"\n" +
" }\n" +
"}";
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.valueOf("application/cloudevents+json;charset=utf-8"));
RequestEntity<String> re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asStringMessage"));
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);
assertThat(response.getBody()).isEqualTo("{\"version\":\"1.0\",\"releaseName\":\"Spring Framework\",\"releaseDate\":\"24-03-2004\"}");
re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asString"));
response = testRestTemplate.exchange(re, String.class);
assertThat(response.getBody()).isEqualTo("{\"version\":\"1.0\",\"releaseName\":\"Spring Framework\",\"releaseDate\":\"24-03-2004\"}");
assertThat(response.getHeaders().get(CloudEventMessageUtils.SOURCE))
.isEqualTo(Collections.singletonList("https://interface21.com/"));
assertThat(response.getHeaders().get(CloudEventMessageUtils.TYPE))
.isEqualTo(Collections.singletonList("com.interface21"));
assertThat(response.getHeaders().get(CloudEventMessageUtils.ID)).isNotNull();
}
@Test
public void testAsBinaryMapToMap() throws Exception {
SpringApplication.run(new Class[] {CloudeventDemoApplication.class}, new String[] {});
HttpHeaders headers = this.buildHeaders(MediaType.APPLICATION_JSON);
String payload = "{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}";
RequestEntity<String> re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/consumeAndProduceCloudEventAsMapToMap"));
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);
assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2050\",\"releaseName\":\"Spring Framework\",\"version\":\"10.0\"}");
assertThat(response.getHeaders().get(CloudEventMessageUtils.SOURCE))
.isEqualTo(Collections.singletonList("https://interface21.com/"));
assertThat(response.getHeaders().get(CloudEventMessageUtils.TYPE))
.isEqualTo(Collections.singletonList("com.interface21"));
}
@Test
public void testAsBinaryPojoToPojo() throws Exception {
SpringApplication.run(new Class[] {CloudeventDemoApplication.class}, new String[] {});
HttpHeaders headers = this.buildHeaders(MediaType.APPLICATION_JSON);
String payload = "{\"releaseDate\":\"01-10-2006\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}";
RequestEntity<String> re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/consumeAndProduceCloudEventAsPojoToPojo"));
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);
assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2006\",\"releaseName\":\"Spring Framework\",\"version\":\"2.0\"}");
assertThat(response.getHeaders().get(CloudEventMessageUtils.SOURCE))
.isEqualTo(Collections.singletonList("https://interface21.com/"));
assertThat(response.getHeaders().get(CloudEventMessageUtils.TYPE))
.isEqualTo(Collections.singletonList("com.interface21"));
}
/*
* Typically this would never happen since spec mandates that HTTP uses 'ce-` prefix.
* So this is to primarily validate that we can recognize it process it and still produce correct headers
*/
@Test
public void testAsBinaryPojoToPojoWrongHeaders() throws Exception {
SpringApplication.run(new Class[] {CloudeventDemoApplication.class}, new String[] {});
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set(CloudEventMessageUtils.ID, UUID.randomUUID().toString());
headers.set(CloudEventMessageUtils.SOURCE, "https://spring.io/");
headers.set(CloudEventMessageUtils.SPECVERSION, "1.0");
headers.set(CloudEventMessageUtils.TYPE, "org.springframework");
String payload = "{\"releaseDate\":\"01-10-2006\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}";
RequestEntity<String> re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/consumeAndProduceCloudEventAsPojoToPojo"));
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);
assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2006\",\"releaseName\":\"Spring Framework\",\"version\":\"2.0\"}");
assertThat(response.getHeaders().get(CloudEventMessageUtils.SOURCE))
.isEqualTo(Collections.singletonList("https://interface21.com/"));
assertThat(response.getHeaders().get(CloudEventMessageUtils.TYPE))
.isEqualTo(Collections.singletonList("com.interface21"));
}
@Test
public void testAsStructuralPojoToPojoDefaultDataContentType() throws Exception {
ApplicationContext context = SpringApplication.run(CloudeventDemoApplication.class);
JsonMapper mapper = context.getBean(JsonMapper.class);
String payload = "{\n" +
" \"specversion\" : \"1.0\",\n" +
" \"type\" : \"org.springframework\",\n" +
" \"source\" : \"https://spring.io/\",\n" +
" \"id\" : \"A234-1234-1234\",\n" +
" \"data\" : {\n" +
" \"version\" : \"1.0\",\n" +
" \"releaseName\" : \"Spring Framework\",\n" +
" \"releaseDate\" : \"24-03-2004\"\n" +
" }\n" +
"}";
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.valueOf("application/cloudevents+json;charset=utf-8"));
RequestEntity<String> re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/consumeAndProduceCloudEventAsPojoToPojo"));
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);
SpringReleaseEvent springReleaseEvent = mapper.fromJson(response.getBody(), SpringReleaseEvent.class);
assertThat(springReleaseEvent.getReleaseName()).isEqualTo("Spring Framework");
assertThat(springReleaseEvent.getVersion()).isEqualTo("2.0");
re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/consumeAndProduceCloudEventAsMapToMap"));
response = testRestTemplate.exchange(re, String.class);
springReleaseEvent = mapper.fromJson(response.getBody(), SpringReleaseEvent.class);
assertThat(springReleaseEvent.getReleaseName()).isEqualTo("Spring Framework");
assertThat(springReleaseEvent.getVersion()).isEqualTo("10.0");
assertThat(response.getHeaders().get(CloudEventMessageUtils.SOURCE))
.isEqualTo(Collections.singletonList("https://interface21.com/"));
assertThat(response.getHeaders().get(CloudEventMessageUtils.TYPE))
.isEqualTo(Collections.singletonList("com.interface21"));
assertThat(response.getHeaders().get(CloudEventMessageUtils.ID)).isNotNull();
}
@Test
public void testPojoConsumer() throws Exception {
ApplicationContext context = SpringApplication.run(new Class[] {CloudeventDemoApplication.class}, new String[] {});
HttpHeaders headers = this.buildHeaders(MediaType.APPLICATION_JSON);
String payload = "{\"releaseDate\":\"01-10-2006\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}";
RequestEntity<String> re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/pojoConsumer"));
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
CloudeventDemoApplication application = context.getBean(CloudeventDemoApplication.class);
assertThat(application.consumerSuccess).isTrue();
}
private URI constructURI(String path) throws Exception {
return new URI("http://localhost:" + System.getProperty("server.port") + path);
}
private HttpHeaders buildHeaders(MediaType contentType) {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(contentType);
headers.set(CloudEventMessageUtils.ID, UUID.randomUUID().toString());
headers.set(CloudEventMessageUtils.SOURCE, "https://spring.io/");
headers.set(CloudEventMessageUtils.SPECVERSION, "1.0");
headers.set(CloudEventMessageUtils.TYPE, "org.springframework");
return headers;
}
@Configuration
public static class FooBarConverterConfiguration {
@Bean
public MessageConverter foobar(JsonMapper jsonMapper) {
return new FooBarToCloudEventMessageConverter(jsonMapper);
}
}
public static class FooBarToCloudEventMessageConverter extends AbstractMessageConverter {
public FooBarToCloudEventMessageConverter(JsonMapper jsonMapper) {
super(new MimeType("foo", "bar"));
}
@Override
protected boolean supports(Class<?> clazz) {
throw new UnsupportedOperationException();
}
@Override
protected boolean canConvertTo(Object payload, @Nullable MessageHeaders headers) {
if (!supportsMimeType(headers)) {
return false;
}
return true;
}
@Override
protected boolean canConvertFrom(Message<?> message, @Nullable Class<?> targetClass) {
if (targetClass == null || !supportsMimeType(message.getHeaders())) {
return false;
}
else if (message.getHeaders().containsKey(CloudEventMessageUtils.DATACONTENTTYPE)
&& message.getHeaders().get(CloudEventMessageUtils.DATACONTENTTYPE).equals("foo/bar")) {
return true;
}
return false;
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
if (message.getHeaders().containsKey(CloudEventMessageUtils.DATACONTENTTYPE)
&& message.getHeaders().get(CloudEventMessageUtils.DATACONTENTTYPE).equals("foo/bar")
&& SpringReleaseEvent.class == targetClass) {
SpringReleaseEvent event = new SpringReleaseEvent();
String[] data = ((String) message.getPayload()).split(":");
SimpleDateFormat df = new SimpleDateFormat("dd-MM-yyyy");
try {
event.setReleaseDate(df.parse(data[0].trim()));
}
catch (Exception e) {
throw new IllegalArgumentException("Failed to convert date", e);
}
event.setReleaseName(data[1]);
event.setVersion(data[2]);
return event;
}
else {
return super.convertFromInternal(message, targetClass, conversionHint);
}
}
@Override
protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers,
@Nullable Object conversionHint) {
return null;
}
}
}