From 8993a9751fe365164fa7c6093e7df170495c2245 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 16 Nov 2020 16:27:54 +0100 Subject: [PATCH] GH-422, GH-606 Add support for generating attributes using provider in Consumer --- .../cloudevent/CloudEventMessageUtils.java | 6 +++ .../cloudevent/CloudeventDemoApplication.java | 46 ++++++++++++++++--- .../CloudeventDemoApplicationRESTTests.java | 13 ++++++ 3 files changed, 58 insertions(+), 7 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index 17060eecb..e16cb5929 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -286,6 +286,12 @@ public final class CloudEventMessageUtils { } } + public static CloudEventAttributes generateAttributesWithProvider(MessageHeaders headers, CloudEventAttributesProvider provider) { + CloudEventAttributes attributes = new CloudEventAttributes(headers); + provider.generateDefaultCloudEventHeaders(attributes); + return attributes; + } + public static CloudEventAttributes generateAttributes(Message inputMessage, Object result, String applicationName) { CloudEventAttributes attributes = new CloudEventAttributes(inputMessage.getHeaders(), CloudEventMessageUtils.determinePrefixToUse(inputMessage)); if (attributes.isValidCloudEvent()) { diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java index 88f1a6e0d..f05205598 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java @@ -16,18 +16,28 @@ package io.spring.cloudevent; +import java.net.URI; +import java.util.List; import java.util.Map; import java.util.function.Consumer; import java.util.function.Function; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.cloud.function.cloudevent.CloudEventAttributes; import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider; import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; +import org.springframework.cloud.function.web.util.HeaderUtils; import org.springframework.context.annotation.Bean; +import org.springframework.http.RequestEntity; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.Assert; +import org.springframework.web.client.RestTemplate; /** * Sample application that demonstrates how user functions can be triggered by cloud event. @@ -120,11 +130,33 @@ public class CloudeventDemoApplication { }; } -// @Bean -// public Consumer pojoConsumer(CloudEventAttributesProvider provider) { -// return event -> { -// -// provider.generateDefaultCloudEventHeaders(attributes); -// }; -// } + @Bean + public Consumer> pojoConsumer(CloudEventAttributesProvider provider, RestTemplateBuilder builder) { + return eventMessage -> { + RequestEntity entity = RequestEntity.post(URI.create("http://foo.com")) + .headers(HeaderUtils.fromMessage( + new MessageHeaders(CloudEventMessageUtils.generateAttributesWithProvider(eventMessage.getHeaders(), provider)))) + .body(eventMessage.getPayload()); + List sourceHeader = entity.getHeaders().get("ce-source"); + List typeHeader = entity.getHeaders().get("ce-type"); + Assert.isTrue(sourceHeader.get(0).equals("https://interface21.com/"), "'source' must be https://interface21.com/"); + Assert.isTrue(typeHeader.get(0).equals("com.interface21"), "'source' must be com.interface21"); + }; + } + + + @Bean + @ConditionalOnExpression("'${K_SINK:}'!=''") + public Consumer>> sink(CloudEventAttributesProvider provider, + RestTemplateBuilder builder, @Value("${K_SINK}") String url) { + RestTemplate client = builder.build(); + return eventMessage -> { + RequestEntity> entity = RequestEntity.post(URI.create("http://foo.com")) + .headers(HeaderUtils.fromMessage( + new MessageHeaders(CloudEventMessageUtils.generateAttributesWithProvider(eventMessage.getHeaders(), provider)))) + .body(eventMessage.getPayload()); + client.exchange(entity, byte[].class); + }; + } + } diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java index 3aaf49671..288d33885 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java @@ -36,6 +36,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.RequestEntity; import org.springframework.http.ResponseEntity; @@ -314,6 +315,18 @@ public class CloudeventDemoApplicationRESTTests { // .isEqualTo(Collections.singletonList(SpringReleaseEvent.class.getName())); } + @Test + public void testPojoConsumer() throws Exception { + SpringApplication.run(new Class[] {CloudeventDemoApplication.class}, new String[] {}); + + HttpHeaders headers = this.buildHeaders(MediaType.APPLICATION_JSON); + String payload = "{\"releaseDate\":\"01-10-2006\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}"; + + RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/pojoConsumer")); + ResponseEntity response = testRestTemplate.exchange(re, String.class); + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + } + private URI constructURI(String path) throws Exception { return new URI("http://localhost:" + System.getProperty("server.port") + path); }