Add stream (kafka, Rabbit) and Rsocket examples for Cloud Event

This commit is contained in:
Oleg Zhurakousky
2020-12-18 16:33:48 +01:00
parent 17d5d4b727
commit 8ece3d3083
41 changed files with 2277 additions and 163 deletions

View File

@@ -178,13 +178,13 @@ public final class CloudEventMessageBuilder<T> {
else if (key.startsWith(CloudEventMessageUtils.KAFKA_ATTR_PREFIX)) {
this.swapPrefix(key, CloudEventMessageUtils.KAFKA_ATTR_PREFIX, attributePrefixToUse);
}
else if (key.equals(CloudEventMessageUtils._ID) || key.equals(CloudEventMessageUtils._SPECVERSION) ||
key.equals(CloudEventMessageUtils._SOURCE) || key.equals(CloudEventMessageUtils._TYPE) ||
key.equals(CloudEventMessageUtils._DATASCHEMA) || key.equals(CloudEventMessageUtils._SCHEMAURL) ||
key.equals(CloudEventMessageUtils._SUBJECT) || key.equals(CloudEventMessageUtils._TIME) ||
key.equals(CloudEventMessageUtils._DATACONTENTTYPE)) {
this.swapPrefix(key, "", attributePrefixToUse);
}
// else if (key.equals(CloudEventMessageUtils._SPECVERSION) ||
// key.equals(CloudEventMessageUtils._SOURCE) || key.equals(CloudEventMessageUtils._TYPE) ||
// key.equals(CloudEventMessageUtils._DATASCHEMA) || key.equals(CloudEventMessageUtils._SCHEMAURL) ||
// key.equals(CloudEventMessageUtils._SUBJECT) || key.equals(CloudEventMessageUtils._TIME) ||
// key.equals(CloudEventMessageUtils._DATACONTENTTYPE)) {
// this.swapPrefix(key, "", attributePrefixToUse);
// }
}
}
return doBuild(attributePrefixToUse);
@@ -209,7 +209,7 @@ public final class CloudEventMessageBuilder<T> {
this.headers.put(prefix + CloudEventMessageUtils._TYPE, this.data.getClass().getName());
}
if (!this.headers.containsKey(prefix + CloudEventMessageUtils._SOURCE)) {
this.headers.put(prefix + CloudEventMessageUtils._SOURCE, URI.create("https://spring.io/" + this.data.getClass().getName()));
this.headers.put(prefix + CloudEventMessageUtils._SOURCE, URI.create("https://spring.io/"));
}
MessageHeaders headers = new MessageHeaders(this.headers);
GenericMessage<T> message = new GenericMessage<T>(this.data, headers);

View File

@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.ContentTypeResolver;
@@ -47,7 +48,17 @@ import org.springframework.util.StringUtils;
*/
public final class CloudEventMessageUtils {
private static final ContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver();
private static final ContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver() {
@Override
public MimeType resolve(@Nullable MessageHeaders headers) {
if (headers.containsKey("content-type")) { // this is temporary workaround for RSocket
return MimeType.valueOf(headers.get("content-type").toString());
}
return super.resolve(headers);
}
};
private static Field MESSAGE_HEADERS = ReflectionUtils.findField(MessageHeaders.class, "headers");
@@ -235,7 +246,7 @@ public final class CloudEventMessageUtils {
String dataContentType = StringUtils.hasText(inputContentType) ? inputContentType
: MimeTypeUtils.APPLICATION_JSON_VALUE;
String suffix = contentType.getSubtypeSuffix();
String suffix = contentType.getSubtypeSuffix() == null ? "json" : contentType.getSubtypeSuffix();
MimeType cloudEventDeserializationContentType = MimeTypeUtils
.parseMimeType(contentType.getType() + "/" + suffix);
Message<?> cloudEventMessage = MessageBuilder.fromMessage(inputMessage)

View File

@@ -88,7 +88,12 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Me
Message<?> resultMessage = null; //result instanceof Message ? (Message<?>) result : null;
CloudEventMessageBuilder<?> messageBuilder;
if (result instanceof Message) {
messageBuilder = CloudEventMessageBuilder.fromMessage((Message<?>) result);
if (CloudEventMessageUtils.isCloudEvent((Message<?>) result)) {
messageBuilder = CloudEventMessageBuilder.fromMessage((Message<?>) result);
}
else {
return (Message<?>) result;
}
}
else {
messageBuilder = CloudEventMessageBuilder
@@ -109,6 +114,6 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Me
private String getApplicationName() {
ConfigurableEnvironment environment = this.applicationContext.getEnvironment();
String name = environment.getProperty("spring.application.name");
return (StringUtils.hasText(name) ? name : "application-" + this.applicationContext.getId());
return (StringUtils.hasText(name) ? name : "");
}
}

View File

@@ -72,7 +72,7 @@ public class CloudEventFunctionTests {
*/
assertThat(CloudEventMessageUtils.isCloudEvent(resultMessage)).isTrue();
assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(Person.class.getName());
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/"));
}
/*
@@ -112,8 +112,8 @@ public class CloudEventFunctionTests {
assertThat(CloudEventMessageUtils.isCloudEvent(message)).isTrue();
assertThat(CloudEventMessageUtils.getType(message)).isEqualTo(SpringReleaseEvent.class.getName());
assertThat(CloudEventMessageUtils.getSource(message)).isEqualTo(URI.create("http://spring.io/application-application"));
assertThat(message.getHeaders().get("ce_source")).isEqualTo(URI.create("http://spring.io/application-application"));
assertThat(CloudEventMessageUtils.getSource(message)).isEqualTo(URI.create("http://spring.io/"));
assertThat(message.getHeaders().get("ce_source")).isEqualTo(URI.create("http://spring.io/"));
}
@SuppressWarnings("unchecked")
@@ -149,8 +149,8 @@ public class CloudEventFunctionTests {
assertThat(CloudEventMessageUtils.isCloudEvent(message)).isTrue();
assertThat(CloudEventMessageUtils.getType(message)).isEqualTo(SpringReleaseEvent.class.getName());
assertThat(CloudEventMessageUtils.getSource(message)).isEqualTo(URI.create("http://spring.io/application-application"));
assertThat(message.getHeaders().get("ce_source")).isEqualTo(URI.create("http://spring.io/application-application"));
assertThat(CloudEventMessageUtils.getSource(message)).isEqualTo(URI.create("http://spring.io/"));
assertThat(message.getHeaders().get("ce_source")).isEqualTo(URI.create("http://spring.io/"));
}
@@ -179,7 +179,7 @@ public class CloudEventFunctionTests {
*/
assertThat(CloudEventMessageUtils.isCloudEvent(resultMessage)).isTrue();
assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(Person.class.getName());
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/"));
}
@SuppressWarnings("unchecked")
@@ -217,7 +217,7 @@ public class CloudEventFunctionTests {
// */
assertThat(CloudEventMessageUtils.isCloudEvent(resultMessage)).isTrue();
assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(SpringReleaseEvent.class.getName());
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/"));
}
@SuppressWarnings("unchecked")
@@ -291,7 +291,7 @@ public class CloudEventFunctionTests {
*/
assertThat(CloudEventMessageUtils.isCloudEvent(resultMessage)).isTrue();
assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(SpringReleaseEvent.class.getName());
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/"));
}
private Function<Object, Object> lookup(String functionDefinition, Class<?>... configClass) {