diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java index 25331f28a..9d1b6fa2b 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java @@ -96,7 +96,15 @@ public class CloudEventsFunctionInvocationHelper implements FunctionInvocationHe if (this.messageConverter != null && CLOUD_EVENT_CLASS != null && CLOUD_EVENT_CLASS.isAssignableFrom(result.getClass())) { convertedResult = this.messageConverter.toMessage(result, input.getHeaders()); } - String targetPrefix = CloudEventMessageUtils.determinePrefixToUse(input.getHeaders(), true); + + String targetPrefix = CloudEventMessageUtils.DEFAULT_ATTR_PREFIX; + if (input != null) { + targetPrefix = CloudEventMessageUtils.determinePrefixToUse(input.getHeaders(), true); + } + else if (result instanceof Message) { + targetPrefix = CloudEventMessageUtils.determinePrefixToUse(((Message) result).getHeaders(), true); + } + Assert.hasText(targetPrefix, "Unable to determine prefix for Cloud Event atttributes, " + "which they must have according to protocol specification. Consider adding 'target-protocol' " + "header with values of one of the supported protocols - [kafka, amqp, http]");