GH-422, GH-606 Add support for generating attributes using provider in Consumer
This commit is contained in:
@@ -16,18 +16,28 @@
|
||||
|
||||
package io.spring.cloudevent;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.boot.web.client.RestTemplateBuilder;
|
||||
import org.springframework.cloud.function.cloudevent.CloudEventAttributes;
|
||||
import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider;
|
||||
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
|
||||
import org.springframework.cloud.function.web.util.HeaderUtils;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.http.RequestEntity;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
/**
|
||||
* Sample application that demonstrates how user functions can be triggered by cloud event.
|
||||
@@ -120,11 +130,33 @@ public class CloudeventDemoApplication {
|
||||
};
|
||||
}
|
||||
|
||||
// @Bean
|
||||
// public Consumer<SpringReleaseEvent> pojoConsumer(CloudEventAttributesProvider provider) {
|
||||
// return event -> {
|
||||
//
|
||||
// provider.generateDefaultCloudEventHeaders(attributes);
|
||||
// };
|
||||
// }
|
||||
@Bean
|
||||
public Consumer<Message<SpringReleaseEvent>> pojoConsumer(CloudEventAttributesProvider provider, RestTemplateBuilder builder) {
|
||||
return eventMessage -> {
|
||||
RequestEntity<SpringReleaseEvent> entity = RequestEntity.post(URI.create("http://foo.com"))
|
||||
.headers(HeaderUtils.fromMessage(
|
||||
new MessageHeaders(CloudEventMessageUtils.generateAttributesWithProvider(eventMessage.getHeaders(), provider))))
|
||||
.body(eventMessage.getPayload());
|
||||
List<String> sourceHeader = entity.getHeaders().get("ce-source");
|
||||
List<String> typeHeader = entity.getHeaders().get("ce-type");
|
||||
Assert.isTrue(sourceHeader.get(0).equals("https://interface21.com/"), "'source' must be https://interface21.com/");
|
||||
Assert.isTrue(typeHeader.get(0).equals("com.interface21"), "'source' must be com.interface21");
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
@ConditionalOnExpression("'${K_SINK:}'!=''")
|
||||
public Consumer<Message<Map<String, Object>>> sink(CloudEventAttributesProvider provider,
|
||||
RestTemplateBuilder builder, @Value("${K_SINK}") String url) {
|
||||
RestTemplate client = builder.build();
|
||||
return eventMessage -> {
|
||||
RequestEntity<Map<String, Object>> entity = RequestEntity.post(URI.create("http://foo.com"))
|
||||
.headers(HeaderUtils.fromMessage(
|
||||
new MessageHeaders(CloudEventMessageUtils.generateAttributesWithProvider(eventMessage.getHeaders(), provider))))
|
||||
.body(eventMessage.getPayload());
|
||||
client.exchange(entity, byte[].class);
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -36,6 +36,7 @@ import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.RequestEntity;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
@@ -314,6 +315,18 @@ public class CloudeventDemoApplicationRESTTests {
|
||||
// .isEqualTo(Collections.singletonList(SpringReleaseEvent.class.getName()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPojoConsumer() throws Exception {
|
||||
SpringApplication.run(new Class[] {CloudeventDemoApplication.class}, new String[] {});
|
||||
|
||||
HttpHeaders headers = this.buildHeaders(MediaType.APPLICATION_JSON);
|
||||
String payload = "{\"releaseDate\":\"01-10-2006\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}";
|
||||
|
||||
RequestEntity<String> re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/pojoConsumer"));
|
||||
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);
|
||||
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
|
||||
}
|
||||
|
||||
private URI constructURI(String path) throws Exception {
|
||||
return new URI("http://localhost:" + System.getProperty("server.port") + path);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user