diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java
index af30a2bb8..cd2cda4f0 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java
@@ -202,6 +202,17 @@ public final class CloudEventMessageUtils {
return get(UUID.randomUUID().toString(), "1.0", ce_source, ce_type);
}
+ /**
+ * Will attempt to convert 'inputMessage' to a binary-mode Cloud Event {@link Message}.
+ * This typically happens when 'inputMessage' represents Cloud Event in structured-mode.
+ *
+ * In the event the message already represents Cloud Event in binary-mode, or this
+ * message does not represent Cloud Event at all, it will be returned unchanged.
+ *
+ * @param inputMessage instance of incoming {@link Message}
+ * @param messageConverter instance of {@link MessageConverter} to assist with type conversion.
+ * @return instance of {@link Message} representing Cloud Event in binary-mode or unchanged 'inputMessage'.
+ */
@SuppressWarnings("unchecked")
public static Message> toBinary(Message> inputMessage, MessageConverter messageConverter) {
@@ -219,6 +230,7 @@ public final class CloudEventMessageUtils {
: MimeTypeUtils.APPLICATION_JSON_VALUE;
String suffix = contentType.getSubtypeSuffix();
+ Assert.hasText(suffix, "Content-type 'suffix' can not be determined from " + contentType);
MimeType cloudEventDeserializationContentType = MimeTypeUtils
.parseMimeType(contentType.getType() + "/" + suffix);
Message> cloudEventMessage = MessageBuilder.fromMessage(inputMessage)
@@ -238,6 +250,13 @@ public final class CloudEventMessageUtils {
return inputMessage;
}
+ /**
+ * Will attempt to determine based on the headers the origin of Message (e.g., HTTP, Kafka etc)
+ * and based on this designate prefix to be used for Cloud Events attributes (i.e., `ce-` or `ce_` etc).
+ *
+ * @param messageHeaders instance of {@link MessageHeaders}
+ * @return prefix to be used for Cloud Events attributes
+ */
public static String determinePrefixToUse(MessageHeaders messageHeaders) {
Set keys = messageHeaders.keySet();
if (keys.contains("user-agent")) {
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java
index 99f92421e..d7999a9d2 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java
@@ -47,6 +47,7 @@ import reactor.util.function.Tuples;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cloud.function.cloudevent.CloudEventAttributes;
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
@@ -910,13 +911,18 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
* case that requires it since it may contain forwarding url
*/
private boolean containsRetainMessageSignalInHeaders(Message message) {
- for (String headerName : message.getHeaders().keySet()) {
- if (headerName.startsWith("lambda") ||
- headerName.startsWith("scf-func-name")) {
- return true;
- }
+ if (new CloudEventAttributes(message.getHeaders()).isValidCloudEvent()) {
+ return true;
+ }
+ else {
+ for (String headerName : message.getHeaders().keySet()) {
+ if (headerName.startsWith("lambda") ||
+ headerName.startsWith("scf-func-name")) {
+ return true;
+ }
+ }
+ return false;
}
- return false;
}
/*
diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java
index f7b8c0084..d5165e013 100644
--- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java
+++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2020 the original author or authors.
+ * Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -34,7 +34,6 @@ import reactor.core.publisher.Mono;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
-import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.context.message.MessageUtils;
@@ -59,16 +58,11 @@ public class RequestProcessor {
private static Log logger = LogFactory.getLog(RequestProcessor.class);
-
- private final FunctionCatalog functionCatalog;
-
private final JsonMapper mapper;
- public RequestProcessor(FunctionCatalog functionCatalog,
- ObjectProvider mapper,
+ public RequestProcessor(ObjectProvider mapper,
ObjectProvider codecs) {
this.mapper = mapper.getIfAvailable();
- this.functionCatalog = functionCatalog;
}
public static FunctionWrapper wrapper(FunctionInvocationWrapper function) {
@@ -186,35 +180,27 @@ public class RequestProcessor {
private Mono> response(FunctionWrapper request, Object handler,
Publisher> result, Boolean single, boolean getter) {
BodyBuilder builder = ResponseEntity.ok();
- if (((FunctionInvocationWrapper) handler).isInputTypeMessage()) {
- result = Flux.from(result)
- .map(message -> MessageUtils.unpack(handler, message))
- .doOnNext(value -> addHeaders(builder, value))
- .map(message -> message.getPayload());
+ if (result instanceof Mono) {
+ result = Mono.from(result)
+ .map(message -> MessageUtils.unpack(handler, message))
+ .doOnNext(value -> {
+ addHeaders(builder, value);
+ if (!isValidCloudEvent(value.getHeaders().keySet())) {
+ builder.headers(HeaderUtils.sanitize(request.headers()));
+ }
+ })
+ .map(message -> message.getPayload());
}
else {
- if (result instanceof Mono) {
- result = Mono.from(result)
- .map(message -> MessageUtils.unpack(handler, message))
- .doOnNext(value -> {
- addHeaders(builder, value);
- if (!isValidCloudEvent(value.getHeaders().keySet())) {
- builder.headers(HeaderUtils.sanitize(request.headers()));
- }
- })
- .map(message -> message.getPayload());
- }
- else {
- result = Flux.from(result)
- .map(message -> MessageUtils.unpack(handler, message))
- .doOnNext(value -> {
- addHeaders(builder, value);
- if (!isValidCloudEvent(value.getHeaders().keySet())) {
- builder.headers(HeaderUtils.sanitize(request.headers()));
- }
- })
- .map(message -> message.getPayload());
- }
+ result = Flux.from(result)
+ .map(message -> MessageUtils.unpack(handler, message))
+ .doOnNext(value -> {
+ addHeaders(builder, value);
+ if (!isValidCloudEvent(value.getHeaders().keySet())) {
+ builder.headers(HeaderUtils.sanitize(request.headers()));
+ }
+ })
+ .map(message -> message.getPayload());
}
if (isOutputSingle(handler)
@@ -231,7 +217,7 @@ public class RequestProcessor {
return Mono.from(result).flatMap(body -> Mono.just(builder.body(body)));
}
- public boolean isValidCloudEvent(Set headerKeys) {
+ private boolean isValidCloudEvent(Set headerKeys) {
return headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID)
&& headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)
&& headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)
diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java
index bc3f12331..5e3e4a0de 100644
--- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java
+++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java
@@ -102,7 +102,6 @@ class FunctionEndpointInitializer implements ApplicationContextInitializer new RequestProcessor(
- context.getBean(FunctionCatalog.class),
context.getBeanProvider(JsonMapper.class),
context.getBeanProvider(ServerCodecConfigurer.class)));
context.registerBean(FunctionEndpointFactory.class,
diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebUtils.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebUtils.java
index ca0c5ca9f..112d9ff62 100644
--- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebUtils.java
+++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebUtils.java
@@ -114,10 +114,7 @@ public final class FunctionWebUtils {
result = ((Mono) result).map(v -> postProcessResult(v, isMessage));
}
else if (result instanceof Message) {
- if (!isMessage) {
-// result = ((Message) result).getPayload();
- }
- else if (((Message) result).getPayload() instanceof byte[]) {
+ if (((Message) result).getPayload() instanceof byte[]) {
String str = new String((byte[]) ((Message) result).getPayload());
result = MessageBuilder.withPayload(str).copyHeaders(((Message) result).getHeaders()).build();
}