Add stream (kafka, Rabbit) and Rsocket examples for Cloud Event
This commit is contained in:
@@ -178,13 +178,13 @@ public final class CloudEventMessageBuilder<T> {
|
||||
else if (key.startsWith(CloudEventMessageUtils.KAFKA_ATTR_PREFIX)) {
|
||||
this.swapPrefix(key, CloudEventMessageUtils.KAFKA_ATTR_PREFIX, attributePrefixToUse);
|
||||
}
|
||||
else if (key.equals(CloudEventMessageUtils._ID) || key.equals(CloudEventMessageUtils._SPECVERSION) ||
|
||||
key.equals(CloudEventMessageUtils._SOURCE) || key.equals(CloudEventMessageUtils._TYPE) ||
|
||||
key.equals(CloudEventMessageUtils._DATASCHEMA) || key.equals(CloudEventMessageUtils._SCHEMAURL) ||
|
||||
key.equals(CloudEventMessageUtils._SUBJECT) || key.equals(CloudEventMessageUtils._TIME) ||
|
||||
key.equals(CloudEventMessageUtils._DATACONTENTTYPE)) {
|
||||
this.swapPrefix(key, "", attributePrefixToUse);
|
||||
}
|
||||
// else if (key.equals(CloudEventMessageUtils._SPECVERSION) ||
|
||||
// key.equals(CloudEventMessageUtils._SOURCE) || key.equals(CloudEventMessageUtils._TYPE) ||
|
||||
// key.equals(CloudEventMessageUtils._DATASCHEMA) || key.equals(CloudEventMessageUtils._SCHEMAURL) ||
|
||||
// key.equals(CloudEventMessageUtils._SUBJECT) || key.equals(CloudEventMessageUtils._TIME) ||
|
||||
// key.equals(CloudEventMessageUtils._DATACONTENTTYPE)) {
|
||||
// this.swapPrefix(key, "", attributePrefixToUse);
|
||||
// }
|
||||
}
|
||||
}
|
||||
return doBuild(attributePrefixToUse);
|
||||
@@ -209,7 +209,7 @@ public final class CloudEventMessageBuilder<T> {
|
||||
this.headers.put(prefix + CloudEventMessageUtils._TYPE, this.data.getClass().getName());
|
||||
}
|
||||
if (!this.headers.containsKey(prefix + CloudEventMessageUtils._SOURCE)) {
|
||||
this.headers.put(prefix + CloudEventMessageUtils._SOURCE, URI.create("https://spring.io/" + this.data.getClass().getName()));
|
||||
this.headers.put(prefix + CloudEventMessageUtils._SOURCE, URI.create("https://spring.io/"));
|
||||
}
|
||||
MessageHeaders headers = new MessageHeaders(this.headers);
|
||||
GenericMessage<T> message = new GenericMessage<T>(this.data, headers);
|
||||
|
||||
@@ -24,6 +24,7 @@ import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.springframework.cloud.function.context.message.MessageUtils;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.converter.ContentTypeResolver;
|
||||
@@ -47,7 +48,17 @@ import org.springframework.util.StringUtils;
|
||||
*/
|
||||
public final class CloudEventMessageUtils {
|
||||
|
||||
private static final ContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver();
|
||||
private static final ContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver() {
|
||||
|
||||
@Override
|
||||
public MimeType resolve(@Nullable MessageHeaders headers) {
|
||||
if (headers.containsKey("content-type")) { // this is temporary workaround for RSocket
|
||||
return MimeType.valueOf(headers.get("content-type").toString());
|
||||
}
|
||||
return super.resolve(headers);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
private static Field MESSAGE_HEADERS = ReflectionUtils.findField(MessageHeaders.class, "headers");
|
||||
|
||||
@@ -235,7 +246,7 @@ public final class CloudEventMessageUtils {
|
||||
String dataContentType = StringUtils.hasText(inputContentType) ? inputContentType
|
||||
: MimeTypeUtils.APPLICATION_JSON_VALUE;
|
||||
|
||||
String suffix = contentType.getSubtypeSuffix();
|
||||
String suffix = contentType.getSubtypeSuffix() == null ? "json" : contentType.getSubtypeSuffix();
|
||||
MimeType cloudEventDeserializationContentType = MimeTypeUtils
|
||||
.parseMimeType(contentType.getType() + "/" + suffix);
|
||||
Message<?> cloudEventMessage = MessageBuilder.fromMessage(inputMessage)
|
||||
|
||||
@@ -88,7 +88,12 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Me
|
||||
Message<?> resultMessage = null; //result instanceof Message ? (Message<?>) result : null;
|
||||
CloudEventMessageBuilder<?> messageBuilder;
|
||||
if (result instanceof Message) {
|
||||
messageBuilder = CloudEventMessageBuilder.fromMessage((Message<?>) result);
|
||||
if (CloudEventMessageUtils.isCloudEvent((Message<?>) result)) {
|
||||
messageBuilder = CloudEventMessageBuilder.fromMessage((Message<?>) result);
|
||||
}
|
||||
else {
|
||||
return (Message<?>) result;
|
||||
}
|
||||
}
|
||||
else {
|
||||
messageBuilder = CloudEventMessageBuilder
|
||||
@@ -109,6 +114,6 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Me
|
||||
private String getApplicationName() {
|
||||
ConfigurableEnvironment environment = this.applicationContext.getEnvironment();
|
||||
String name = environment.getProperty("spring.application.name");
|
||||
return (StringUtils.hasText(name) ? name : "application-" + this.applicationContext.getId());
|
||||
return (StringUtils.hasText(name) ? name : "");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,7 +72,7 @@ public class CloudEventFunctionTests {
|
||||
*/
|
||||
assertThat(CloudEventMessageUtils.isCloudEvent(resultMessage)).isTrue();
|
||||
assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(Person.class.getName());
|
||||
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
|
||||
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/"));
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -112,8 +112,8 @@ public class CloudEventFunctionTests {
|
||||
|
||||
assertThat(CloudEventMessageUtils.isCloudEvent(message)).isTrue();
|
||||
assertThat(CloudEventMessageUtils.getType(message)).isEqualTo(SpringReleaseEvent.class.getName());
|
||||
assertThat(CloudEventMessageUtils.getSource(message)).isEqualTo(URI.create("http://spring.io/application-application"));
|
||||
assertThat(message.getHeaders().get("ce_source")).isEqualTo(URI.create("http://spring.io/application-application"));
|
||||
assertThat(CloudEventMessageUtils.getSource(message)).isEqualTo(URI.create("http://spring.io/"));
|
||||
assertThat(message.getHeaders().get("ce_source")).isEqualTo(URI.create("http://spring.io/"));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -149,8 +149,8 @@ public class CloudEventFunctionTests {
|
||||
|
||||
assertThat(CloudEventMessageUtils.isCloudEvent(message)).isTrue();
|
||||
assertThat(CloudEventMessageUtils.getType(message)).isEqualTo(SpringReleaseEvent.class.getName());
|
||||
assertThat(CloudEventMessageUtils.getSource(message)).isEqualTo(URI.create("http://spring.io/application-application"));
|
||||
assertThat(message.getHeaders().get("ce_source")).isEqualTo(URI.create("http://spring.io/application-application"));
|
||||
assertThat(CloudEventMessageUtils.getSource(message)).isEqualTo(URI.create("http://spring.io/"));
|
||||
assertThat(message.getHeaders().get("ce_source")).isEqualTo(URI.create("http://spring.io/"));
|
||||
}
|
||||
|
||||
|
||||
@@ -179,7 +179,7 @@ public class CloudEventFunctionTests {
|
||||
*/
|
||||
assertThat(CloudEventMessageUtils.isCloudEvent(resultMessage)).isTrue();
|
||||
assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(Person.class.getName());
|
||||
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
|
||||
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/"));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -217,7 +217,7 @@ public class CloudEventFunctionTests {
|
||||
// */
|
||||
assertThat(CloudEventMessageUtils.isCloudEvent(resultMessage)).isTrue();
|
||||
assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(SpringReleaseEvent.class.getName());
|
||||
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
|
||||
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/"));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -291,7 +291,7 @@ public class CloudEventFunctionTests {
|
||||
*/
|
||||
assertThat(CloudEventMessageUtils.isCloudEvent(resultMessage)).isTrue();
|
||||
assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(SpringReleaseEvent.class.getName());
|
||||
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
|
||||
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/"));
|
||||
}
|
||||
|
||||
private Function<Object, Object> lookup(String functionDefinition, Class<?>... configClass) {
|
||||
|
||||
Reference in New Issue
Block a user