From ed50d7c2526ec6f393021df9bc86fcea434ecbab Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 27 Sep 2021 18:03:32 +0200 Subject: [PATCH] GH-746 Fix support for Cloud Event properly enriching Function Resolves #746 --- .../catalog/SimpleFunctionRegistry.java | 43 +++++++++++++--- .../cloudevent/CloudEventFunctionTests.java | 51 +++++++++++++++++++ 2 files changed, 87 insertions(+), 7 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index e49d3d9b0..21ef6fc50 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -841,13 +841,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect } Object result = ((Function) this.target).apply(inputValue); - if (result instanceof Flux && functionInvocationHelper != null) { - result = ((Flux) result).map(v -> { - if (firstInputMessage.get() != null && CloudEventMessageUtils.isCloudEvent(firstInputMessage.get())) { - return functionInvocationHelper.postProcessResult(v, firstInputMessage.get()); - } - return v; - }); + if (result instanceof Publisher && functionInvocationHelper != null) { + result = this.postProcessFunction((Publisher) result, firstInputMessage); } return value instanceof OriginalMessageHolder @@ -855,6 +850,40 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect : result; } + @SuppressWarnings("unchecked") + private Publisher postProcessFunction(Publisher result, AtomicReference> firstInputMessage) { + if (FunctionTypeUtils.isPublisher(this.inputType) && FunctionTypeUtils.isPublisher(this.outputType)) { + if (!FunctionTypeUtils.getRawType(FunctionTypeUtils.getImmediateGenericType(this.inputType, 0)) + .isAssignableFrom(Void.class) + && !FunctionTypeUtils.getRawType(FunctionTypeUtils.getImmediateGenericType(this.outputType, 0)) + .isAssignableFrom(Void.class)) { + + if (result instanceof Mono) { + return Mono.from((result)).map(v -> { + if (firstInputMessage.get() != null && CloudEventMessageUtils + .isCloudEvent(firstInputMessage.get())) { + return functionInvocationHelper.postProcessResult(v, + firstInputMessage.get()); + } + return v; + }); + } + else { + return Flux.from((result)).map(v -> { + if (firstInputMessage.get() != null && CloudEventMessageUtils + .isCloudEvent(firstInputMessage.get())) { + return functionInvocationHelper.postProcessResult(v, + firstInputMessage.get()); + } + return v; + }); + } + } + } + + return result; + } + /* * */ diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java index ef0095767..388b0f057 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java @@ -23,6 +23,7 @@ import java.util.function.Function; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; @@ -153,6 +154,42 @@ public class CloudEventFunctionTests { assertThat(message.getHeaders().get("ce_source")).isEqualTo(URI.create("http://spring.io/")); } + @SuppressWarnings("unchecked") + @Test + public void testBinaryPojoToPojoDefaultOutputHeaderProviderReactiveMono() { + Function function = this.lookup("springReleaseReactiveMono", TestConfiguration.class); + + String id = UUID.randomUUID().toString(); + + String payload = "{\n" + + " \"version\" : \"1.0\",\n" + + " \"releaseName\" : \"Spring Framework\",\n" + + " \"releaseDate\" : \"24-03-2004\"\n" + + " }"; + + Message inputMessage = CloudEventMessageBuilder + .withData(payload) + .setId(id) + .setSource("https://spring.io/") + .setType("org.springframework") + .setHeader(MessageUtils.TARGET_PROTOCOL, CloudEventMessageUtils.Protocols.KAFKA) + .build(CloudEventMessageUtils.AMQP_ATTR_PREFIX); + + assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isTrue(); + + Message message = ((Mono>) function.apply(Mono.just(inputMessage))).block(); + + /* + * Validates that although user only deals with POJO, the framework recognizes + * both on input and output that it is dealing with Cloud Event and generates + * appropriate headers/attributes + */ + + assertThat(CloudEventMessageUtils.isCloudEvent(message)).isTrue(); + assertThat(CloudEventMessageUtils.getType(message)).isEqualTo(SpringReleaseEvent.class.getName()); + assertThat(CloudEventMessageUtils.getSource(message)).isEqualTo(URI.create("http://spring.io/")); + assertThat(message.getHeaders().get("ce_source")).isEqualTo(URI.create("http://spring.io/")); + } // this kind of emulates that message came from Kafka @@ -336,6 +373,20 @@ public class CloudEventFunctionTests { }); } + @Bean + Function, Mono> springReleaseReactiveMono() { + return mono -> mono.map(event -> { + try { + event.setReleaseDate(new SimpleDateFormat("dd-MM-yyyy").parse("01-10-2006")); + event.setVersion("2.0"); + return event; + } + catch (Exception e) { + throw new IllegalArgumentException(e); + } + }); + } + @Bean Function, Message> springReleaseAsMessage() { return message -> {