GH-422 GH-606 Add support for normalizing structure-mode CE message
Normalizing in this context means converting it to binary-mode so the rest of the processing logic is the same. Added support for canonical attribute names. Now, internally any attribute can be set as 'ce_' regardless where it came from are where it goes to as the frameork will be able to recognize both Removed CloudEventMessageConverter Renamed CloudEventAttributes to CloudEventAttributesHelperas it is better suited to what it actually does
This commit is contained in:
@@ -21,7 +21,7 @@ import java.util.function.Function;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cloud.function.cloudevent.CloudEventAttributes;
|
||||
import org.springframework.cloud.function.cloudevent.CloudEventAttributesHelper;
|
||||
import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider;
|
||||
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
@@ -90,7 +90,7 @@ public class CloudeventDemoApplication {
|
||||
data.setVersion("2.0");
|
||||
data.setReleaseDateAsString("01-10-2006");
|
||||
|
||||
CloudEventAttributes ceAttributes = CloudEventMessageUtils.get(ceMessage.getHeaders())
|
||||
CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils.get(ceMessage.getHeaders())
|
||||
.setSource("https://interface21.com/")
|
||||
.setType("com.interface21");
|
||||
|
||||
|
||||
@@ -22,7 +22,6 @@ import java.net.URI;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
@@ -30,10 +29,7 @@ import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.test.web.client.TestRestTemplate;
|
||||
import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider;
|
||||
import org.springframework.cloud.function.cloudevent.CloudEventJsonMessageConverter;
|
||||
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
|
||||
import org.springframework.cloud.function.cloudevent.DefaultCloudEventAttributesProvider;
|
||||
import org.springframework.cloud.function.json.JsonMapper;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
@@ -49,7 +45,6 @@ import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.converter.AbstractMessageConverter;
|
||||
import org.springframework.messaging.converter.MessageConverter;
|
||||
import org.springframework.util.MimeType;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
import org.springframework.util.SocketUtils;
|
||||
|
||||
/**
|
||||
@@ -219,9 +214,9 @@ public class CloudeventDemoApplicationRESTTests {
|
||||
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);
|
||||
|
||||
assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2050\",\"releaseName\":\"Spring Framework\",\"version\":\"10.0\"}");
|
||||
assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_SOURCE))
|
||||
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE))
|
||||
.isEqualTo(Collections.singletonList("http://spring.io/application-application"));
|
||||
assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_TYPE))
|
||||
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE))
|
||||
.isEqualTo(Collections.singletonList(LinkedHashMap.class.getName()));
|
||||
}
|
||||
|
||||
@@ -236,12 +231,40 @@ public class CloudeventDemoApplicationRESTTests {
|
||||
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);
|
||||
|
||||
assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2006\",\"releaseName\":\"Spring Framework\",\"version\":\"2.0\"}");
|
||||
assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_SOURCE))
|
||||
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE))
|
||||
.isEqualTo(Collections.singletonList("http://spring.io/application-application"));
|
||||
assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_TYPE))
|
||||
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE))
|
||||
.isEqualTo(Collections.singletonList(SpringReleaseEvent.class.getName()));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Typically this would never happen since spec mandates that HTTP uses 'ce-` prefix.
|
||||
* So this is to primarily validate that we can recognize it process it and still produce correct headers
|
||||
*/
|
||||
@Test
|
||||
public void testAsBinaryPojoToPojoWrongHeaders() throws Exception {
|
||||
SpringApplication.run(new Class[] {CloudeventDemoApplication.class}, new String[] {});
|
||||
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||
headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.ID, UUID.randomUUID().toString());
|
||||
headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.SOURCE, "https://spring.io/");
|
||||
headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION, "1.0");
|
||||
headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.TYPE, "org.springframework");
|
||||
String payload = "{\"releaseDate\":\"01-10-2006\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}";
|
||||
|
||||
RequestEntity<String> re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/consumeAndProduceCloudEventAsPojoToPojo"));
|
||||
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);
|
||||
|
||||
assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2006\",\"releaseName\":\"Spring Framework\",\"version\":\"2.0\"}");
|
||||
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE))
|
||||
.isEqualTo(Collections.singletonList("http://spring.io/application-application"));
|
||||
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE))
|
||||
.isEqualTo(Collections.singletonList(SpringReleaseEvent.class.getName()));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testAsStructuralPojoToPojo() throws Exception {
|
||||
ApplicationContext context = SpringApplication.run(CloudeventDemoApplication.class);
|
||||
@@ -281,9 +304,9 @@ public class CloudeventDemoApplicationRESTTests {
|
||||
assertThat(springReleaseEvent.getVersion()).isEqualTo("10.0");
|
||||
|
||||
|
||||
// assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_SOURCE))
|
||||
// assertThat(response.getHeaders().get(CloudEventMessageUtils.CANONICAL_SOURCE))
|
||||
// .isEqualTo(Collections.singletonList("http://spring.io/application-application"));
|
||||
// assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_TYPE))
|
||||
// assertThat(response.getHeaders().get(CloudEventMessageUtils.CANONICAL_TYPE))
|
||||
// .isEqualTo(Collections.singletonList(SpringReleaseEvent.class.getName()));
|
||||
}
|
||||
|
||||
@@ -294,10 +317,10 @@ public class CloudeventDemoApplicationRESTTests {
|
||||
private HttpHeaders buildHeaders(MediaType contentType) {
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.setContentType(contentType);
|
||||
headers.set(CloudEventMessageUtils.CE_ID, UUID.randomUUID().toString());
|
||||
headers.set(CloudEventMessageUtils.CE_SOURCE, "https://spring.io/");
|
||||
headers.set(CloudEventMessageUtils.CE_SPECVERSION, "1.0");
|
||||
headers.set(CloudEventMessageUtils.CE_TYPE, "org.springframework");
|
||||
headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID, UUID.randomUUID().toString());
|
||||
headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE, "https://spring.io/");
|
||||
headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION, "1.0");
|
||||
headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE, "org.springframework");
|
||||
return headers;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user