Cleaned up Cloud Event support around reactive functions

Added reactive tests
Restructured CloudEventsFunctionInvocationHelper
This commit is contained in:
Oleg Zhurakousky
2020-12-11 14:34:32 +01:00
parent 096df2f22c
commit 30febe6bce
6 changed files with 206 additions and 55 deletions

View File

@@ -199,10 +199,8 @@ public final class CloudEventMessageBuilder<T> {
this.headers.put(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE);
MessageHeaders headers = new MessageHeaders(this.headers);
GenericMessage<T> message = new GenericMessage<T>(this.data, headers);
Assert.hasText(CloudEventMessageUtils.getSpecVersion(message), "'specversion' must not be null or empty");
Assert.notNull(CloudEventMessageUtils.getSource(message), "'source' must not be null");
Assert.hasText(CloudEventMessageUtils.getType(message), "'type' must not be null or empty");
Assert.hasText(CloudEventMessageUtils.getId(message), "'id' must not be null or empty");
Assert.isTrue(CloudEventMessageUtils.isCloudEvent(message), "The message does not appear to be a valid Cloud Event, "
+ "since one of the required attributes (id, specversion, type, source) is missing");
return message;
}
}

View File

@@ -265,20 +265,9 @@ public final class CloudEventMessageUtils {
*/
static String determinePrefixToUse(Map<String, Object> messageHeaders) {
String targetProtocol = (String) messageHeaders.get(MessageUtils.TARGET_PROTOCOL);
if (StringUtils.hasText(targetProtocol)) {
if ("kafka".equals(targetProtocol)) {
return CloudEventMessageUtils.KAFKA_ATTR_PREFIX;
}
else if ("amqp".equals(targetProtocol)) {
return CloudEventMessageUtils.AMQP_ATTR_PREFIX;
}
else if ("http".equals(targetProtocol)) {
return CloudEventMessageUtils.DEFAULT_ATTR_PREFIX;
}
else {
throw new IllegalArgumentException("Provided TARGET_PROTOCOL is not suported: " + targetProtocol + ". "
+ "Supported protoclos are, 'kafka', 'amqp' and 'http'");
}
String prefix = determinePrefixToUse(targetProtocol);
if (StringUtils.hasText(prefix)) {
return prefix;
}
else {
for (String key : messageHeaders.keySet()) {
@@ -297,16 +286,44 @@ public final class CloudEventMessageUtils {
return "";
}
/**
* Determines attribute prefix based on the provided target protocol.
* @param targetProtocol target protocol (see {@link MessageUtils#TARGET_PROTOCOL}
* @return prefix (e.g., 'ce_' or 'ce-' etc.)
*/
static String determinePrefixToUse(String targetProtocol) {
if (StringUtils.hasText(targetProtocol)) {
if (Protocols.KAFKA.equals(targetProtocol)) {
return CloudEventMessageUtils.KAFKA_ATTR_PREFIX;
}
else if (Protocols.AMQP.equals(targetProtocol)) {
return CloudEventMessageUtils.AMQP_ATTR_PREFIX;
}
else if (Protocols.HTTP.equals(targetProtocol)) {
return CloudEventMessageUtils.DEFAULT_ATTR_PREFIX;
}
}
return "";
}
/**
* Will check for the existence of required attributes. Assumes attributes (headers)
* are in canonical form.
* @param message input {@link Message}
* @return true if this Message represents Cloud Event in binary-mode
*/
static boolean isCloudEvent(Message<?> message) {
return message.getHeaders().containsKey(SPECVERSION)
&& message.getHeaders().containsKey(TYPE)
&& message.getHeaders().containsKey(SOURCE);
public static boolean isCloudEvent(Message<?> message) {
return (message.getHeaders().containsKey(SPECVERSION)
&& message.getHeaders().containsKey(TYPE)
&& message.getHeaders().containsKey(SOURCE))
||
(message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _SPECVERSION)
&& message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _TYPE)
&& message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _SOURCE))
||
(message.getHeaders().containsKey(KAFKA_ATTR_PREFIX + _SPECVERSION)
&& message.getHeaders().containsKey(KAFKA_ATTR_PREFIX + _TYPE)
&& message.getHeaders().containsKey(KAFKA_ATTR_PREFIX + _SOURCE));
}
private static boolean isAttribute(String key) {
@@ -376,4 +393,13 @@ public final class CloudEventMessageUtils {
}
return (URI) uri;
}
public static class Protocols {
static String AMQP = "amqp";
static String AVRO = "avro";
static String HTTP = "http";
static String JSON = "json";
static String KAFKA = "kafka";
}
}

View File

@@ -30,7 +30,6 @@ import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StringUtils;
/**
@@ -59,6 +58,7 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Me
@Override
public Message<?> preProcessInput(Message<?> input, Object inputConverter) {
// TODO find a way to invoke it conditionally. May be check for certain headers with all known prefixes as well as content type
try {
return CloudEventMessageUtils.toCanonical(input, (MessageConverter) inputConverter);
}
@@ -68,32 +68,15 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Me
}
@Override
public Message<?> postProcessResult(Message<?> input, Object result) {
Message<?> resultMessage = result instanceof Message ? (Message<?>) result : null;
if (CloudEventMessageUtils.isCloudEvent(input)) {
CloudEventMessageBuilder<?> messageBuilder;
if (result instanceof Message) {
messageBuilder = CloudEventMessageBuilder.fromMessage((Message<?>) result);
}
else {
messageBuilder = CloudEventMessageBuilder
.withData(result)
.setId(UUID.randomUUID().toString())
.setSource(URI.create("http://spring.io/" + getApplicationName()))
.setType(result.getClass().getName());
}
public Message<?> postProcessResult(Object result, Message<?> input) {
String targetPrefix = CloudEventMessageUtils.determinePrefixToUse(input.getHeaders());
return this.doPostProcessResult(result, targetPrefix);
}
if (this.cloudEventAttributesProvider != null) {
messageBuilder = this.cloudEventAttributesProvider.enrich(messageBuilder);
}
resultMessage = messageBuilder.build(CloudEventMessageUtils.determinePrefixToUse(input.getHeaders()));
}
else if (!(result instanceof Message<?>)) {
resultMessage = MessageBuilder.withPayload(result).build();
}
return resultMessage;
@Override
public Message<?> postProcessResult(Object result, String targetProtocol) {
String targetPrefix = CloudEventMessageUtils.determinePrefixToUse(targetProtocol);
return this.doPostProcessResult(result, targetPrefix);
}
@Override
@@ -101,6 +84,27 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Me
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
private Message<?> doPostProcessResult(Object result, String targetPrefix) {
Message<?> resultMessage = null; //result instanceof Message ? (Message<?>) result : null;
CloudEventMessageBuilder<?> messageBuilder;
if (result instanceof Message) {
messageBuilder = CloudEventMessageBuilder.fromMessage((Message<?>) result);
}
else {
messageBuilder = CloudEventMessageBuilder
.withData(result)
.setId(UUID.randomUUID().toString())
.setSource(URI.create("http://spring.io/" + getApplicationName()))
.setType(result.getClass().getName());
}
if (this.cloudEventAttributesProvider != null) {
messageBuilder = this.cloudEventAttributesProvider.enrich(messageBuilder);
}
resultMessage = messageBuilder.build(targetPrefix);
return resultMessage;
}
private String getApplicationName() {
ConfigurableEnvironment environment = this.applicationContext.getEnvironment();

View File

@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -45,6 +46,7 @@ import reactor.util.function.Tuples;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.FunctionRegistration;
@@ -621,13 +623,13 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
.getField(SimpleFunctionRegistry.this.headersField, ((Message) result).getHeaders());
this.sanitizeHeaders(((Message) input).getHeaders()).forEach((k, v) -> headersMap.putIfAbsent(k, v));
if (functionInvocationHelper != null) {
result = functionInvocationHelper.postProcessResult((Message<?>) input, result);
if (functionInvocationHelper != null && CloudEventMessageUtils.isCloudEvent(((Message) input))) {
result = functionInvocationHelper.postProcessResult(result, (Message) input);
}
}
else {
if (functionInvocationHelper != null) {
result = functionInvocationHelper.postProcessResult((Message<?>) input, result);
if (functionInvocationHelper != null && CloudEventMessageUtils.isCloudEvent(((Message) input))) {
result = functionInvocationHelper.postProcessResult(result, (Message) input);
}
else {
result = MessageBuilder.withPayload(result).copyHeaders(this.sanitizeHeaders(((Message) input).getHeaders())).build();
@@ -694,12 +696,24 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
*/
@SuppressWarnings("unchecked")
private Object invokeFunctionAndEnrichResultIfNecessary(Object value) {
AtomicReference<Message<?>> firstInputMessage = new AtomicReference<>();
Object inputValue;
if (value instanceof Flux) {
inputValue = ((Flux) value).map(v -> this.extractValueFromOriginalValueHolderIfNecessary(v));
inputValue = ((Flux) value).map(v -> {
if (v instanceof OriginalMessageHolder && firstInputMessage.get() == null) {
firstInputMessage.set(((OriginalMessageHolder) v).getOriginalMessage());
}
return this.extractValueFromOriginalValueHolderIfNecessary(v);
});
}
else if (value instanceof Mono) {
inputValue = ((Mono) value).map(v -> this.extractValueFromOriginalValueHolderIfNecessary(v));
inputValue = ((Mono) value).map(v -> {
if (v instanceof OriginalMessageHolder) {
firstInputMessage.set(((OriginalMessageHolder) v).getOriginalMessage());
}
return this.extractValueFromOriginalValueHolderIfNecessary(v);
});
}
else {
inputValue = this.extractValueFromOriginalValueHolderIfNecessary(value);
@@ -710,6 +724,15 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
}
Object result = ((Function) this.target).apply(inputValue);
if (result instanceof Flux && functionInvocationHelper != null) {
result = ((Flux) result).map(v -> {
if (firstInputMessage.get() != null && CloudEventMessageUtils.isCloudEvent(firstInputMessage.get())) {
return functionInvocationHelper.postProcessResult(v, firstInputMessage.get());
}
return v;
});
}
return value instanceof OriginalMessageHolder
? this.enrichInvocationResultIfNecessary(((OriginalMessageHolder) value).getOriginalMessage(), result)
: result;

View File

@@ -22,10 +22,12 @@ import java.util.UUID;
import java.util.function.Function;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -33,6 +35,8 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
/**
@@ -71,6 +75,86 @@ public class CloudEventFunctionTests {
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
}
/*
* Aside from the properly processing and recognizing CE, the following tow tests (imperative and reactive)
* also emulate message coming from one protocol going to another via MessageUtils.TARGET_PROTOCOL header that
* is set here explicitly but for instance in s-c-stream is set by the framework
*/
@Test
public void testBinaryPojoToPojoDefaultOutputHeaderProviderImperative() {
Function<Object, Object> function = this.lookup("springRelease", TestConfiguration.class);
String id = UUID.randomUUID().toString();
String payload = "{\n" +
" \"version\" : \"1.0\",\n" +
" \"releaseName\" : \"Spring Framework\",\n" +
" \"releaseDate\" : \"24-03-2004\"\n" +
" }";
Message<String> inputMessage = CloudEventMessageBuilder
.withData(payload)
.setId(id)
.setSource("https://spring.io/")
.setType("org.springframework")
.setHeader(MessageUtils.TARGET_PROTOCOL, CloudEventMessageUtils.Protocols.KAFKA)
.build(CloudEventMessageUtils.AMQP_ATTR_PREFIX);
assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isTrue();
Message<?> message = (Message<?>) function.apply(inputMessage);
/*
* Validates that although user only deals with POJO, the framework recognizes
* both on input and output that it is dealing with Cloud Event and generates
* appropriate headers/attributes
*/
assertThat(CloudEventMessageUtils.isCloudEvent(message)).isTrue();
assertThat(CloudEventMessageUtils.getType(message)).isEqualTo(SpringReleaseEvent.class.getName());
assertThat(CloudEventMessageUtils.getSource(message)).isEqualTo(URI.create("http://spring.io/application-application"));
assertThat(message.getHeaders().get("ce_source")).isEqualTo(URI.create("http://spring.io/application-application"));
}
@SuppressWarnings("unchecked")
@Test
public void testBinaryPojoToPojoDefaultOutputHeaderProviderReactive() {
Function<Object, Object> function = this.lookup("springReleaseReactive", TestConfiguration.class);
String id = UUID.randomUUID().toString();
String payload = "{\n" +
" \"version\" : \"1.0\",\n" +
" \"releaseName\" : \"Spring Framework\",\n" +
" \"releaseDate\" : \"24-03-2004\"\n" +
" }";
Message<String> inputMessage = CloudEventMessageBuilder
.withData(payload)
.setId(id)
.setSource("https://spring.io/")
.setType("org.springframework")
.setHeader(MessageUtils.TARGET_PROTOCOL, CloudEventMessageUtils.Protocols.KAFKA)
.build(CloudEventMessageUtils.AMQP_ATTR_PREFIX);
assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isTrue();
Message<?> message = ((Flux<Message<?>>) function.apply(Flux.just(inputMessage))).blockFirst();
/*
* Validates that although user only deals with POJO, the framework recognizes
* both on input and output that it is dealing with Cloud Event and generates
* appropriate headers/attributes
*/
assertThat(CloudEventMessageUtils.isCloudEvent(message)).isTrue();
assertThat(CloudEventMessageUtils.getType(message)).isEqualTo(SpringReleaseEvent.class.getName());
assertThat(CloudEventMessageUtils.getSource(message)).isEqualTo(URI.create("http://spring.io/application-application"));
assertThat(message.getHeaders().get("ce_source")).isEqualTo(URI.create("http://spring.io/application-application"));
}
// this kind of emulates that message came from Kafka
@SuppressWarnings("unchecked")
@Test
@@ -238,6 +322,20 @@ public class CloudEventFunctionTests {
};
}
@Bean
Function<Flux<SpringReleaseEvent>, Flux<SpringReleaseEvent>> springReleaseReactive() {
return flux -> flux.map(event -> {
try {
event.setReleaseDate(new SimpleDateFormat("dd-MM-yyyy").parse("01-10-2006"));
event.setVersion("2.0");
return event;
}
catch (Exception e) {
throw new IllegalArgumentException(e);
}
});
}
@Bean
Function<Message<SpringReleaseEvent>, Message<SpringReleaseEvent>> springReleaseAsMessage() {
return message -> {