GH-746 Fix support for Cloud Event properly enriching Function<Mono, Mono>

Resolves #746
This commit is contained in:
Oleg Zhurakousky
2021-09-27 18:03:32 +02:00
parent 03babc429d
commit ed50d7c252
2 changed files with 87 additions and 7 deletions

View File

@@ -841,13 +841,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
}
Object result = ((Function) this.target).apply(inputValue);
if (result instanceof Flux && functionInvocationHelper != null) {
result = ((Flux) result).map(v -> {
if (firstInputMessage.get() != null && CloudEventMessageUtils.isCloudEvent(firstInputMessage.get())) {
return functionInvocationHelper.postProcessResult(v, firstInputMessage.get());
}
return v;
});
if (result instanceof Publisher && functionInvocationHelper != null) {
result = this.postProcessFunction((Publisher) result, firstInputMessage);
}
return value instanceof OriginalMessageHolder
@@ -855,6 +850,40 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
: result;
}
@SuppressWarnings("unchecked")
private Publisher postProcessFunction(Publisher result, AtomicReference<Message<?>> firstInputMessage) {
if (FunctionTypeUtils.isPublisher(this.inputType) && FunctionTypeUtils.isPublisher(this.outputType)) {
if (!FunctionTypeUtils.getRawType(FunctionTypeUtils.getImmediateGenericType(this.inputType, 0))
.isAssignableFrom(Void.class)
&& !FunctionTypeUtils.getRawType(FunctionTypeUtils.getImmediateGenericType(this.outputType, 0))
.isAssignableFrom(Void.class)) {
if (result instanceof Mono) {
return Mono.from((result)).map(v -> {
if (firstInputMessage.get() != null && CloudEventMessageUtils
.isCloudEvent(firstInputMessage.get())) {
return functionInvocationHelper.postProcessResult(v,
firstInputMessage.get());
}
return v;
});
}
else {
return Flux.from((result)).map(v -> {
if (firstInputMessage.get() != null && CloudEventMessageUtils
.isCloudEvent(firstInputMessage.get())) {
return functionInvocationHelper.postProcessResult(v,
firstInputMessage.get());
}
return v;
});
}
}
}
return result;
}
/*
*
*/

View File

@@ -23,6 +23,7 @@ import java.util.function.Function;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
@@ -153,6 +154,42 @@ public class CloudEventFunctionTests {
assertThat(message.getHeaders().get("ce_source")).isEqualTo(URI.create("http://spring.io/"));
}
@SuppressWarnings("unchecked")
@Test
public void testBinaryPojoToPojoDefaultOutputHeaderProviderReactiveMono() {
Function<Object, Object> function = this.lookup("springReleaseReactiveMono", TestConfiguration.class);
String id = UUID.randomUUID().toString();
String payload = "{\n" +
" \"version\" : \"1.0\",\n" +
" \"releaseName\" : \"Spring Framework\",\n" +
" \"releaseDate\" : \"24-03-2004\"\n" +
" }";
Message<String> inputMessage = CloudEventMessageBuilder
.withData(payload)
.setId(id)
.setSource("https://spring.io/")
.setType("org.springframework")
.setHeader(MessageUtils.TARGET_PROTOCOL, CloudEventMessageUtils.Protocols.KAFKA)
.build(CloudEventMessageUtils.AMQP_ATTR_PREFIX);
assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isTrue();
Message<?> message = ((Mono<Message<?>>) function.apply(Mono.just(inputMessage))).block();
/*
* Validates that although user only deals with POJO, the framework recognizes
* both on input and output that it is dealing with Cloud Event and generates
* appropriate headers/attributes
*/
assertThat(CloudEventMessageUtils.isCloudEvent(message)).isTrue();
assertThat(CloudEventMessageUtils.getType(message)).isEqualTo(SpringReleaseEvent.class.getName());
assertThat(CloudEventMessageUtils.getSource(message)).isEqualTo(URI.create("http://spring.io/"));
assertThat(message.getHeaders().get("ce_source")).isEqualTo(URI.create("http://spring.io/"));
}
// this kind of emulates that message came from Kafka
@@ -336,6 +373,20 @@ public class CloudEventFunctionTests {
});
}
@Bean
Function<Mono<SpringReleaseEvent>, Mono<SpringReleaseEvent>> springReleaseReactiveMono() {
return mono -> mono.map(event -> {
try {
event.setReleaseDate(new SimpleDateFormat("dd-MM-yyyy").parse("01-10-2006"));
event.setVersion("2.0");
return event;
}
catch (Exception e) {
throw new IllegalArgumentException(e);
}
});
}
@Bean
Function<Message<SpringReleaseEvent>, Message<SpringReleaseEvent>> springReleaseAsMessage() {
return message -> {