Provide initial integration with SDK CloudEvent
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
* Copyright 2020-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,9 +16,7 @@
|
||||
|
||||
package org.springframework.cloud.function.cloudevent;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
|
||||
import org.springframework.cloud.function.core.FunctionInvocationHelper;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
@@ -37,18 +35,8 @@ class CloudEventsFunctionExtensionConfiguration {
|
||||
// The following two beans are intended to be mutually exclusive. Only one should be activated based
|
||||
// on the presence of Cloud Event SDK API
|
||||
@Bean
|
||||
@ConditionalOnMissingClass("io.cloudevents.CloudEvent")
|
||||
@ConditionalOnMissingBean
|
||||
public FunctionInvocationHelper<Message<?>> nativeFunctionInvocationHelper(@Nullable CloudEventHeaderEnricher cloudEventHeadersProvider) {
|
||||
return new CloudEventsFunctionInvocationHelper(cloudEventHeadersProvider);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnClass(name = "io.cloudevents.CloudEvent")
|
||||
@ConditionalOnMissingBean
|
||||
public FunctionInvocationHelper<Message<?>> sdkFunctionInvocationHelper() {
|
||||
// TODO you may need SDKs header provider
|
||||
return null;
|
||||
}
|
||||
// ========================================================
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@ import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.core.env.ConfigurableEnvironment;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.converter.CompositeMessageConverter;
|
||||
import org.springframework.messaging.converter.MessageConverter;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
@@ -40,14 +41,26 @@ import org.springframework.util.StringUtils;
|
||||
* @since 3.1
|
||||
*
|
||||
*/
|
||||
class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Message<?>>, ApplicationContextAware {
|
||||
public class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Message<?>>, ApplicationContextAware {
|
||||
|
||||
private ConfigurableApplicationContext applicationContext;
|
||||
|
||||
private final CloudEventHeaderEnricher cloudEventAttributesProvider;
|
||||
|
||||
private CompositeMessageConverter messageConverter;
|
||||
|
||||
private final Class<?> CLOUD_EVENT_CLASS;
|
||||
|
||||
CloudEventsFunctionInvocationHelper(@Nullable CloudEventHeaderEnricher cloudEventHeadersProvider) {
|
||||
this.cloudEventAttributesProvider = cloudEventHeadersProvider;
|
||||
Class<?> clazz = null;
|
||||
try {
|
||||
clazz = Thread.currentThread().getContextClassLoader().loadClass("io.cloudevents.CloudEvent");
|
||||
}
|
||||
catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
CLOUD_EVENT_CLASS = clazz;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -67,10 +80,21 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Me
|
||||
}
|
||||
}
|
||||
|
||||
public void setMessageConverter(CompositeMessageConverter messageConverter) {
|
||||
this.messageConverter = messageConverter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message<?> postProcessResult(Object result, Message<?> input) {
|
||||
String targetPrefix = CloudEventMessageUtils.determinePrefixToUse(input.getHeaders());
|
||||
return this.doPostProcessResult(result, targetPrefix);
|
||||
Message<?> convertedResult = null;
|
||||
if (this.messageConverter != null && CLOUD_EVENT_CLASS != null && CLOUD_EVENT_CLASS.isAssignableFrom(result.getClass())) {
|
||||
convertedResult = this.messageConverter.toMessage(result, input.getHeaders());
|
||||
}
|
||||
if (convertedResult == null) {
|
||||
String targetPrefix = CloudEventMessageUtils.determinePrefixToUse(input.getHeaders());
|
||||
convertedResult = this.doPostProcessResult(result, targetPrefix);
|
||||
}
|
||||
return convertedResult;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -31,6 +31,7 @@ import org.springframework.beans.factory.BeanFactory;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.function.cloudevent.CloudEventsFunctionInvocationHelper;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.context.FunctionProperties;
|
||||
import org.springframework.cloud.function.context.FunctionRegistry;
|
||||
@@ -110,6 +111,9 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
|
||||
if (!CollectionUtils.isEmpty(mcList)) {
|
||||
messageConverter = new SmartCompositeMessageConverter(mcList);
|
||||
if (functionInvocationHelper instanceof CloudEventsFunctionInvocationHelper) {
|
||||
((CloudEventsFunctionInvocationHelper) functionInvocationHelper).setMessageConverter(messageConverter);
|
||||
}
|
||||
}
|
||||
|
||||
return new BeanFactoryAwareFunctionRegistry(conversionService, messageConverter, jsonMapper, functionInvocationHelper);
|
||||
|
||||
@@ -71,6 +71,9 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter {
|
||||
@Nullable
|
||||
public Message<?> toMessage(Object payload, @Nullable MessageHeaders headers) {
|
||||
for (MessageConverter converter : getConverters()) {
|
||||
if (headers.get(MessageHeaders.CONTENT_TYPE) == null) {
|
||||
return null;
|
||||
}
|
||||
Object value = headers.get(MessageHeaders.CONTENT_TYPE).toString();
|
||||
String[] contentTypes = StringUtils.delimitedListToStringArray((String) value, ",");
|
||||
for (String contentType : contentTypes) {
|
||||
|
||||
Reference in New Issue
Block a user