From 1925b490dc7a87231530ee2c5f013ee98c96d457 Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Wed, 25 Nov 2020 16:07:08 +0000 Subject: [PATCH] Add a new strategy for header enrichment There was a TODO in the cloud events section of the function catalog. This extracts that into a strategy and autoconfigures it: CloudEventOutputMessageHeaderEnricher. --- ...CloudEventOutputMessageHeaderEnricher.java | 83 +++++ .../BeanFactoryAwareFunctionRegistry.java | 44 +-- .../catalog/SimpleFunctionRegistry.java | 344 ++++++++++-------- .../config/CloudEventAutoConfiguration.java | 34 ++ .../CompositeOutputMessageHeaderEnricher.java | 44 +++ .../message/OutputMessageHeaderEnricher.java | 28 ++ .../main/resources/META-INF/spring.factories | 3 +- 7 files changed, 391 insertions(+), 189 deletions(-) create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventOutputMessageHeaderEnricher.java create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventAutoConfiguration.java create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/CompositeOutputMessageHeaderEnricher.java create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/OutputMessageHeaderEnricher.java diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventOutputMessageHeaderEnricher.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventOutputMessageHeaderEnricher.java new file mode 100644 index 000000000..31c07c462 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventOutputMessageHeaderEnricher.java @@ -0,0 +1,83 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.function.cloudevent; + +import org.springframework.beans.BeansException; +import org.springframework.cloud.function.context.message.OutputMessageHeaderEnricher; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.core.Ordered; +import org.springframework.core.env.Environment; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.ClassUtils; +import org.springframework.util.StringUtils; + +/** + * @author Dave Syer + * + */ +public class CloudEventOutputMessageHeaderEnricher + implements OutputMessageHeaderEnricher, ApplicationContextAware, Ordered { + + private ApplicationContext applicationContext; + + private CloudEventAttributesProvider cloudEventAttributesProvider; + + private static final String CLOUD_EVENT_TYPE_NAME = "io.cloudevents.api.CloudEvent"; + + private static Class CLOUD_EVENT_TYPE = ClassUtils.isPresent(CLOUD_EVENT_TYPE_NAME, null) + ? ClassUtils.resolveClassName(CLOUD_EVENT_TYPE_NAME, null) : null; + + @Override + public int getOrder() { + return 0; + } + + @Override + public Message enrich(Message output) { + Object invocationResult = output.getPayload(); + if (CLOUD_EVENT_TYPE != null && CLOUD_EVENT_TYPE.isAssignableFrom(invocationResult.getClass())) { + // User is sending us an actual CloudEvent, so no need to guess the attributes + return output; + } + CloudEventAttributes generatedCeHeaders = CloudEventMessageUtils.generateAttributes(output, + invocationResult.getClass().getName(), getApplicationName()); + CloudEventAttributes attributes = new CloudEventAttributes(generatedCeHeaders, + CloudEventMessageUtils.determinePrefixToUse(output.getHeaders())); + if (cloudEventAttributesProvider != null) { + // Global defaults can easily be changed by injecting one of these + cloudEventAttributesProvider.generateDefaultCloudEventHeaders(attributes); + } + return MessageBuilder.withPayload(invocationResult).copyHeaders(attributes).build(); + } + + private String getApplicationName() { + Environment environment = this.applicationContext.getEnvironment(); + String name = environment.getProperty("spring.application.name"); + return "http://spring.io/" + + (StringUtils.hasText(name) ? name : "application-" + this.applicationContext.getId()); + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + if (applicationContext.getBeanNamesForType(CloudEventAttributesProvider.class).length > 0) { + this.cloudEventAttributesProvider = applicationContext.getBean(CloudEventAttributesProvider.class); + } + } + +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java index 60261c1a1..0047b811d 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java @@ -20,10 +20,10 @@ import java.lang.reflect.Method; import java.lang.reflect.Type; import java.util.Arrays; import java.util.Set; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; @@ -32,21 +32,17 @@ import org.springframework.aop.framework.ProxyFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils; -import org.springframework.cloud.function.cloudevent.CloudEventAttributes; -import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider; -import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionRegistry; +import org.springframework.cloud.function.context.message.CompositeOutputMessageHeaderEnricher; +import org.springframework.cloud.function.context.message.OutputMessageHeaderEnricher; import org.springframework.cloud.function.json.JsonMapper; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.support.GenericApplicationContext; import org.springframework.core.convert.ConversionService; -import org.springframework.core.env.ConfigurableEnvironment; -import org.springframework.messaging.Message; import org.springframework.messaging.converter.CompositeMessageConverter; -import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.StringUtils; /** @@ -59,7 +55,7 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp private GenericApplicationContext applicationContext; - private CloudEventAttributesProvider cloudEventAtttributesProvider; + private OutputMessageHeaderEnricher enricher; public BeanFactoryAwareFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter, JsonMapper jsonMapper) { @@ -69,8 +65,9 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = (GenericApplicationContext) applicationContext; - if (applicationContext.getBeanNamesForType(CloudEventAttributesProvider.class).length > 0) { - this.cloudEventAtttributesProvider = applicationContext.getBean(CloudEventAttributesProvider.class); + if (applicationContext.getBeanNamesForType(OutputMessageHeaderEnricher.class).length > 0) { + this.enricher = new CompositeOutputMessageHeaderEnricher(applicationContext + .getBeanProvider(OutputMessageHeaderEnricher.class).orderedStream().collect(Collectors.toList())); } } @@ -164,36 +161,13 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp function = super.doLookup(type, functionDefinition, expectedOutputMimeTypes); } - if (function != null) { - BiFunction, Object, Message> invocationResultHeaderEnricher = new BiFunction, Object, Message>() { - @Override - public Message apply(Message inputMessage, Object invocationResult) { - // TODO: Factor it out! Cloud Events specific code - CloudEventAttributes generatedCeHeaders = CloudEventMessageUtils.generateAttributes(inputMessage, - invocationResult.getClass().getName(), getApplicationName()); - CloudEventAttributes attributes = new CloudEventAttributes(generatedCeHeaders, - CloudEventMessageUtils.determinePrefixToUse(inputMessage.getHeaders())); - if (cloudEventAtttributesProvider != null) { - cloudEventAtttributesProvider.generateDefaultCloudEventHeaders(attributes); - } - Message message = MessageBuilder.withPayload(invocationResult).copyHeaders(attributes).build(); - - return message; - } - }; - function.setOutputMessageHeaderEnricher(invocationResultHeaderEnricher); + if (function != null && this.enricher != null) { + function.setOutputMessageHeaderEnricher(this.enricher); } return (T) function; } - private String getApplicationName() { - ConfigurableEnvironment environment = this.applicationContext.getEnvironment(); - String name = environment.getProperty("spring.application.name"); - return "http://spring.io/" - + (StringUtils.hasText(name) ? name : "application-" + this.applicationContext.getId()); - } - private Object discoverFunctionInBeanFactory(String functionName) { Object functionCandidate = null; if (this.applicationContext.containsBean(functionName)) { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index fc2b56d8b..b27bf341f 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.TreeSet; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -53,6 +52,7 @@ import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionRegistry; import org.springframework.cloud.function.context.config.RoutingFunction; +import org.springframework.cloud.function.context.message.OutputMessageHeaderEnricher; import org.springframework.cloud.function.json.JsonMapper; import org.springframework.core.ResolvableType; import org.springframework.core.convert.ConversionService; @@ -69,20 +69,21 @@ import org.springframework.util.ObjectUtils; import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; - /** - * Implementation of {@link FunctionCatalog} and {@link FunctionRegistry} which - * does not depend on Spring's {@link BeanFactory}. - * Each function must be registered with it explicitly to benefit from features - * such as type conversion, composition, POJO etc. + * Implementation of {@link FunctionCatalog} and {@link FunctionRegistry} which does not + * depend on Spring's {@link BeanFactory}. Each function must be registered with it + * explicitly to benefit from features such as type conversion, composition, POJO etc. * * @author Oleg Zhurakousky * */ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspector { + protected Log logger = LogFactory.getLog(this.getClass()); + /* - * - do we care about FunctionRegistration after it's been registered? What additional value does it bring? + * - do we care about FunctionRegistration after it's been registered? What additional + * value does it bring? * */ @@ -101,7 +102,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect @Autowired(required = false) private FunctionAroundWrapper functionAroundWrapper; - public SimpleFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter, JsonMapper jsonMapper) { + public SimpleFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter, + JsonMapper jsonMapper) { Assert.notNull(messageConverter, "'messageConverter' must not be null"); Assert.notNull(jsonMapper, "'jsonMapper' must not be null"); this.conversionService = conversionService; @@ -143,7 +145,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect this.functionRegistrations.add(registration); } - //----- + // ----- @Override public Set getNames(Class type) { @@ -173,7 +175,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect function = this.compose(type, functionDefinition); } - if (function != null && !ObjectUtils.isEmpty(expectedOutputMimeTypes)) { + if (function != null && !ObjectUtils.isEmpty(expectedOutputMimeTypes)) { function.expectedOutputContentType = expectedOutputMimeTypes; } else if (logger.isDebugEnabled()) { @@ -186,23 +188,19 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect } /** - * This method will make sure that if there is only one function in catalog - * it can be looked up by any name or no name. - * It does so by attempting to determine the default function name - * (the only function in catalog) and checking if it matches the provided name - * replacing it if it does not. + * This method will make sure that if there is only one function in catalog it can be + * looked up by any name or no name. It does so by attempting to determine the default + * function name (the only function in catalog) and checking if it matches the + * provided name replacing it if it does not. */ String normalizeFunctionDefinition(String functionDefinition) { - functionDefinition = StringUtils.hasText(functionDefinition) - ? functionDefinition.replaceAll(",", "|") + functionDefinition = StringUtils.hasText(functionDefinition) ? functionDefinition.replaceAll(",", "|") : System.getProperty(FunctionProperties.FUNCTION_DEFINITION, ""); if (!this.getNames(null).contains(functionDefinition)) { List eligibleFunction = this.getNames(null).stream() - .filter(name -> !RoutingFunction.FUNCTION_NAME.equals(name)) - .collect(Collectors.toList()); - if (eligibleFunction.size() == 1 - && !eligibleFunction.get(0).equals(functionDefinition) + .filter(name -> !RoutingFunction.FUNCTION_NAME.equals(name)).collect(Collectors.toList()); + if (eligibleFunction.size() == 1 && !eligibleFunction.get(0).equals(functionDefinition) && !functionDefinition.contains("|")) { functionDefinition = eligibleFunction.get(0); } @@ -211,9 +209,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect } /** - * This is primarily to support spring-cloud-sleauth. - * There is no current use cases in functions where it is used. - * The approach may change in the future. + * This is primarily to support spring-cloud-sleauth. There is no current use cases in + * functions where it is used. The approach may change in the future. */ private FunctionInvocationWrapper wrapInAroundAviceIfNecessary(FunctionInvocationWrapper function) { FunctionInvocationWrapper wrappedFunction = function; @@ -234,12 +231,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect */ private FunctionInvocationWrapper findFunctionInFunctionRegistrations(String functionName) { FunctionRegistration functionRegistration = this.functionRegistrations.stream() - .filter(fr -> fr.getNames().contains(functionName)) - .findFirst() - .orElseGet(() -> null); - return functionRegistration != null - ? this.invocationWrapperInstance(functionName, functionRegistration.getTarget(), functionRegistration.getType().getType()) - : null; + .filter(fr -> fr.getNames().contains(functionName)).findFirst().orElseGet(() -> null); + return functionRegistration != null ? this.invocationWrapperInstance(functionName, + functionRegistration.getTarget(), functionRegistration.getType().getType()) : null; } @@ -247,7 +241,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect * */ private FunctionInvocationWrapper compose(Class type, String functionDefinition) { - String[] functionNames = StringUtils.delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), "|"); + String[] functionNames = StringUtils.delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), + "|"); FunctionInvocationWrapper composedFunction = null; for (String functionName : functionNames) { @@ -260,9 +255,10 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect composedFunction = function; } else { - FunctionInvocationWrapper andThenFunction = - invocationWrapperInstance(functionName, function.getTarget(), function.inputType, function.outputType); - composedFunction = (FunctionInvocationWrapper) composedFunction.andThen((Function) andThenFunction); + FunctionInvocationWrapper andThenFunction = invocationWrapperInstance(functionName, + function.getTarget(), function.inputType, function.outputType); + composedFunction = (FunctionInvocationWrapper) composedFunction + .andThen((Function) andThenFunction); } this.wrappedFunctionDefinitions.put(composedFunction.functionDefinition, composedFunction); } @@ -276,14 +272,16 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect /* * */ - private FunctionInvocationWrapper invocationWrapperInstance(String functionDefinition, Object target, Type inputType, Type outputType) { + private FunctionInvocationWrapper invocationWrapperInstance(String functionDefinition, Object target, + Type inputType, Type outputType) { return new FunctionInvocationWrapper(functionDefinition, target, inputType, outputType); } /* * */ - private FunctionInvocationWrapper invocationWrapperInstance(String functionDefinition, Object target, Type functionType) { + private FunctionInvocationWrapper invocationWrapperInstance(String functionDefinition, Object target, + Type functionType) { return invocationWrapperInstance(functionDefinition, target, FunctionTypeUtils.isSupplier(functionType) ? null : FunctionTypeUtils.getInputType(functionType), FunctionTypeUtils.getOutputType(functionType)); @@ -293,7 +291,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect * */ @SuppressWarnings("rawtypes") - public class FunctionInvocationWrapper implements Function, Consumer, Supplier, Runnable { + public class FunctionInvocationWrapper + implements Function, Consumer, Supplier, Runnable { private final Object target; @@ -314,17 +313,17 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect private boolean skipOutputConversion; /* - * This is primarily to support Stream's ability to access - * un-converted payload (e.g., to evaluate expression on some attribute of a payload) - * It does not have a setter/getter and can only be set via reflection. - * It is not intended to remain here and will be removed as soon as particular elements - * of stream will be refactored to address this. + * This is primarily to support Stream's ability to access un-converted payload + * (e.g., to evaluate expression on some attribute of a payload) It does not have + * a setter/getter and can only be set via reflection. It is not intended to + * remain here and will be removed as soon as particular elements of stream will + * be refactored to address this. */ private Function enhancer; - private BiFunction, Object, Message> outputMessageHeaderEnricher; + private OutputMessageHeaderEnricher outputMessageHeaderEnricher; - void setOutputMessageHeaderEnricher(BiFunction, Object, Message> outputMessageHeaderEnricher) { + void setOutputMessageHeaderEnricher(OutputMessageHeaderEnricher outputMessageHeaderEnricher) { this.outputMessageHeaderEnricher = outputMessageHeaderEnricher; } @@ -336,7 +335,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect this.message = this.inputType != null && FunctionTypeUtils.isMessage(this.inputType); } - FunctionInvocationWrapper(String functionDefinition, Object target, Type inputType, Type outputType) { + FunctionInvocationWrapper(String functionDefinition, Object target, Type inputType, Type outputType) { this.target = target; this.inputType = this.normalizeType(inputType); this.outputType = this.normalizeType(outputType); @@ -346,14 +345,16 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect public void setSkipInputConversion(boolean skipInputConversion) { if (logger.isDebugEnabled() && skipInputConversion) { - logger.debug("'skipInputConversion' was explicitely set to true. No input conversion will be attempted"); + logger.debug( + "'skipInputConversion' was explicitely set to true. No input conversion will be attempted"); } this.skipInputConversion = skipInputConversion; } public void setSkipOutputConversion(boolean skipOutputConversion) { if (logger.isDebugEnabled() && skipOutputConversion) { - logger.debug("'skipOutputConversion' was explicitely set to true. No output conversion will be attempted"); + logger.debug( + "'skipOutputConversion' was explicitely set to true. No output conversion will be attempted"); } this.skipOutputConversion = skipOutputConversion; } @@ -371,23 +372,27 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect } /** - * Return the actual {@link Type} of the item of the provided type. - * This method is context specific and is not a general purpose utility method. The context is that the provided - * {@link Type} may represent the input/output of a function where such type could be wrapped in - * {@link Message}, {@link Flux} or {@link Mono}, so this method returns generic value of such type or itself if not wrapped. - * @param type typically input or output Type of the function (see {@link #getInputType()} or {@link #getOutputType()}. + * Return the actual {@link Type} of the item of the provided type. This method is + * context specific and is not a general purpose utility method. The context is + * that the provided {@link Type} may represent the input/output of a function + * where such type could be wrapped in {@link Message}, {@link Flux} or + * {@link Mono}, so this method returns generic value of such type or itself if + * not wrapped. + * @param type typically input or output Type of the function (see + * {@link #getInputType()} or {@link #getOutputType()}. * @return the type of the item if wrapped otherwise the provided type. */ public Type getItemType(Type type) { - if (FunctionTypeUtils.isPublisher(type) || FunctionTypeUtils.isMessage(type) || FunctionTypeUtils.isTypeCollection(type)) { + if (FunctionTypeUtils.isPublisher(type) || FunctionTypeUtils.isMessage(type) + || FunctionTypeUtils.isTypeCollection(type)) { type = FunctionTypeUtils.getGenericType(type); } return type; } /** - * Use individual {@link #getInputType()}, {@link #getOutputType()} and their variants as well as - * other supporting operations instead. + * Use individual {@link #getInputType()}, {@link #getOutputType()} and their + * variants as well as other supporting operations instead. * @deprecated since 3.1 */ @Deprecated @@ -420,7 +425,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect */ @Override public Object apply(Object input) { - if (logger.isDebugEnabled() && !(input instanceof Publisher)) { + if (logger.isDebugEnabled() && !(input instanceof Publisher)) { logger.debug("Invoking function " + this); } Object result = this.doApply(input); @@ -476,7 +481,6 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect return FunctionTypeUtils.isMessage(this.outputType); } - public boolean isRoutingFunction() { return this.target instanceof RoutingFunction; } @@ -487,12 +491,14 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect @SuppressWarnings("unchecked") @Override public Function andThen(Function after) { - Assert.isTrue(after instanceof FunctionInvocationWrapper, "Composed function must be an instanceof FunctionInvocationWrapper."); + Assert.isTrue(after instanceof FunctionInvocationWrapper, + "Composed function must be an instanceof FunctionInvocationWrapper."); if (FunctionTypeUtils.isMultipleArgumentType(this.inputType) || FunctionTypeUtils.isMultipleArgumentType(this.outputType) || FunctionTypeUtils.isMultipleArgumentType(((FunctionInvocationWrapper) after).inputType) || FunctionTypeUtils.isMultipleArgumentType(((FunctionInvocationWrapper) after).outputType)) { - throw new UnsupportedOperationException("Composition of functions with multiple arguments is not supported at the moment"); + throw new UnsupportedOperationException( + "Composition of functions with multiple arguments is not supported at the moment"); } Function rawComposedFunction = v -> ((FunctionInvocationWrapper) after).doApply(doApply(v)); @@ -501,35 +507,39 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect Type composedFunctionType; if (afterWrapper.outputType == null) { - composedFunctionType = ResolvableType.forClassWithGenerics(Consumer.class, this.inputType == null - ? null - : ResolvableType.forType(this.inputType)).getType(); + composedFunctionType = ResolvableType.forClassWithGenerics(Consumer.class, + this.inputType == null ? null : ResolvableType.forType(this.inputType)).getType(); } else if (this.inputType == null && afterWrapper.outputType != null) { ResolvableType composedOutputType; if (FunctionTypeUtils.isFlux(this.outputType)) { - composedOutputType = ResolvableType.forClassWithGenerics(Flux.class, ResolvableType.forType(afterWrapper.outputType)); + composedOutputType = ResolvableType.forClassWithGenerics(Flux.class, + ResolvableType.forType(afterWrapper.outputType)); } else if (FunctionTypeUtils.isMono(this.outputType)) { - composedOutputType = ResolvableType.forClassWithGenerics(Mono.class, ResolvableType.forType(afterWrapper.outputType)); + composedOutputType = ResolvableType.forClassWithGenerics(Mono.class, + ResolvableType.forType(afterWrapper.outputType)); } else { composedOutputType = ResolvableType.forType(afterWrapper.outputType); } - composedFunctionType = ResolvableType.forClassWithGenerics(Supplier.class, composedOutputType).getType(); + composedFunctionType = ResolvableType.forClassWithGenerics(Supplier.class, composedOutputType) + .getType(); } else if (this.outputType == null) { throw new IllegalArgumentException("Can NOT compose anything with Consumer"); } else { - composedFunctionType = ResolvableType.forClassWithGenerics(Function.class, - ResolvableType.forType(this.inputType), - ResolvableType.forType(((FunctionInvocationWrapper) after).outputType)).getType(); + composedFunctionType = ResolvableType + .forClassWithGenerics(Function.class, ResolvableType.forType(this.inputType), + ResolvableType.forType(((FunctionInvocationWrapper) after).outputType)) + .getType(); } String composedName = this.functionDefinition + "|" + afterWrapper.functionDefinition; - FunctionInvocationWrapper composedFunction = invocationWrapperInstance(composedName, rawComposedFunction, composedFunctionType); + FunctionInvocationWrapper composedFunction = invocationWrapperInstance(composedName, rawComposedFunction, + composedFunctionType); composedFunction.composed = true; return (Function) composedFunction; @@ -548,12 +558,14 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect */ @Override public String toString() { - return this.functionDefinition + (this.isComposed() ? "" : "<" + this.inputType + ", " + this.outputType + ">"); + return this.functionDefinition + + (this.isComposed() ? "" : "<" + this.inputType + ", " + this.outputType + ">"); } /** * Returns true if this function wrapper represents a composed function. - * @return true if this function wrapper represents a composed function otherwise false + * @return true if this function wrapper represents a composed function otherwise + * false */ boolean isComposed() { return this.composed; @@ -593,7 +605,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect } /** - * Will return Object.class if type is represented as TypeVariable(T) or WildcardType(?). + * Will return Object.class if type is represented as TypeVariable(T) or + * WildcardType(?). */ private Type normalizeType(Type type) { if (type != null) { @@ -606,13 +619,13 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect * */ private Class getRawClassFor(@Nullable Type type) { - return type instanceof TypeVariable || type instanceof WildcardType - ? Object.class + return type instanceof TypeVariable || type instanceof WildcardType ? Object.class : FunctionTypeUtils.getRawType(type); } /** - * Will wrap the result in a Message if necessary and will copy input headers to the output message. + * Will wrap the result in a Message if necessary and will copy input headers to + * the output message. */ @SuppressWarnings("unchecked") private Object enrichInvocationResultIfNecessary(Object input, Object result) { @@ -620,14 +633,17 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect if (result instanceof Message) { Map headersMap = (Map) ReflectionUtils .getField(SimpleFunctionRegistry.this.headersField, ((Message) result).getHeaders()); - this.sanitizeHeaders(((Message) input).getHeaders()).forEach((k, v) -> headersMap.putIfAbsent(k, v)); + this.sanitizeHeaders(((Message) input).getHeaders()) + .forEach((k, v) -> headersMap.putIfAbsent(k, v)); } else { + Message output = MessageBuilder.withPayload(result) + .copyHeaders(this.sanitizeHeaders(((Message) input).getHeaders())).build(); if (this.outputMessageHeaderEnricher != null) { - result = this.outputMessageHeaderEnricher.apply((Message) input, result); + result = this.outputMessageHeaderEnricher.enrich(output); } else { - result = MessageBuilder.withPayload(result).copyHeaders(this.sanitizeHeaders(((Message) input).getHeaders())).build(); + result = output; } } } @@ -651,9 +667,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect * */ private Object fluxifyInputIfNecessary(Object input) { - if (!(input instanceof Publisher) && this.isTypePublisher(this.inputType) && !FunctionTypeUtils.isMultipleArgumentType(this.inputType)) { - return input == null - ? FunctionTypeUtils.isMono(this.inputType) ? Mono.empty() : Flux.empty() + if (!(input instanceof Publisher) && this.isTypePublisher(this.inputType) + && !FunctionTypeUtils.isMultipleArgumentType(this.inputType)) { + return input == null ? FunctionTypeUtils.isMono(this.inputType) ? Mono.empty() : Flux.empty() : FunctionTypeUtils.isMono(this.inputType) ? Mono.just(input) : Flux.just(input); } return input; @@ -667,20 +683,24 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect Object result; if (!this.isTypePublisher(this.inputType) && convertedInput instanceof Publisher) { result = convertedInput instanceof Mono - ? Mono.from((Publisher) convertedInput).map(value -> this.invokeFunctionAndEnrichResultIfNecessary(value)) - .doOnError(ex -> logger.error("Failed to invoke function '" + this.functionDefinition + "'", (Throwable) ex)) - : Flux.from((Publisher) convertedInput).map(value -> this.invokeFunctionAndEnrichResultIfNecessary(value)) - .doOnError(ex -> logger.error("Failed to invoke function '" + this.functionDefinition + "'", (Throwable) ex)); + ? Mono.from((Publisher) convertedInput) + .map(value -> this.invokeFunctionAndEnrichResultIfNecessary(value)) + .doOnError(ex -> logger.error( + "Failed to invoke function '" + this.functionDefinition + "'", (Throwable) ex)) + : Flux.from((Publisher) convertedInput) + .map(value -> this.invokeFunctionAndEnrichResultIfNecessary(value)) + .doOnError(ex -> logger.error( + "Failed to invoke function '" + this.functionDefinition + "'", (Throwable) ex)); } else { result = this.invokeFunctionAndEnrichResultIfNecessary(convertedInput); if (result instanceof Flux) { - result = ((Flux) result).doOnError(ex -> logger.error("Failed to invoke function '" - + this.functionDefinition + "'", (Throwable) ex)); + result = ((Flux) result).doOnError(ex -> logger + .error("Failed to invoke function '" + this.functionDefinition + "'", (Throwable) ex)); } else if (result instanceof Mono) { - result = ((Mono) result).doOnError(ex -> logger.error("Failed to invoke function '" - + this.functionDefinition + "'", (Throwable) ex)); + result = ((Mono) result).doOnError(ex -> logger + .error("Failed to invoke function '" + this.functionDefinition + "'", (Throwable) ex)); } } return result; @@ -707,9 +727,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect } Object result = ((Function) this.target).apply(inputValue); - return value instanceof OriginalMessageHolder - ? this.enrichInvocationResultIfNecessary(((OriginalMessageHolder) value).getOriginalMessage(), result) - : result; + return value instanceof OriginalMessageHolder ? this.enrichInvocationResultIfNecessary( + ((OriginalMessageHolder) value).getOriginalMessage(), result) : result; } /* @@ -720,20 +739,20 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect Object result = null; if (this.isTypePublisher(this.inputType)) { if (convertedInput instanceof Flux) { - result = ((Flux) convertedInput) - .transform(flux -> { - flux = Flux.from((Publisher) flux).map(v -> this.extractValueFromOriginalValueHolderIfNecessary(v)); - ((Consumer) this.target).accept(flux); - return Mono.ignoreElements((Flux) flux); - }).then(); + result = ((Flux) convertedInput).transform(flux -> { + flux = Flux.from((Publisher) flux) + .map(v -> this.extractValueFromOriginalValueHolderIfNecessary(v)); + ((Consumer) this.target).accept(flux); + return Mono.ignoreElements((Flux) flux); + }).then(); } else { - result = ((Mono) convertedInput) - .transform(mono -> { - mono = Mono.from((Publisher) mono).map(v -> this.extractValueFromOriginalValueHolderIfNecessary(v)); - ((Consumer) this.target).accept(mono); - return Mono.ignoreElements((Flux) mono); - }).then(); + result = ((Mono) convertedInput).transform(mono -> { + mono = Mono.from((Publisher) mono) + .map(v -> this.extractValueFromOriginalValueHolderIfNecessary(v)); + ((Consumer) this.target).accept(mono); + return Mono.ignoreElements((Flux) mono); + }).then(); } } else if (convertedInput instanceof Publisher) { @@ -771,12 +790,14 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect } return parsedArgumentValues; } - throw new UnsupportedOperationException("At the moment only Tuple-based function are supporting multiple arguments"); + throw new UnsupportedOperationException( + "At the moment only Tuple-based function are supporting multiple arguments"); } @SuppressWarnings("unchecked") private boolean isInputConversionNecessary(Object input, Type type) { - if (type == null || this.getRawClassFor(type) == Void.class || this.target instanceof RoutingFunction || this.isComposed()) { + if (type == null || this.getRawClassFor(type) == Void.class || this.target instanceof RoutingFunction + || this.isComposed()) { if (this.getRawClassFor(type) == Void.class) { if (input instanceof Message) { input = ((Message) input).getPayload(); @@ -790,6 +811,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect } return true; } + /* * */ @@ -813,13 +835,13 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect convertedInput = Tuples.fromArray(convertedInputs); } else if (this.skipInputConversion) { - convertedInput = this.isInputTypeMessage() - ? input + convertedInput = this.isInputTypeMessage() ? input : new OriginalMessageHolder(((Message) input).getPayload(), (Message) input); } else if (input instanceof Message) { - if (((Message) input).getPayload().getClass().getName().equals("org.springframework.kafka.support.KafkaNull") - && !this.isInputTypeMessage()) { //TODO rework + if (((Message) input).getPayload().getClass().getName() + .equals("org.springframework.kafka.support.KafkaNull") && !this.isInputTypeMessage()) { // TODO + // rework return null; } @@ -827,12 +849,12 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect convertedInput = this.convertInputMessageIfNecessary((Message) input, type); if (convertedInput == null) { // give ConversionService a chance - convertedInput = this.convertNonMessageInputIfNecessary(type, ((Message) input).getPayload(), false); + convertedInput = this.convertNonMessageInputIfNecessary(type, ((Message) input).getPayload(), + false); } if (convertedInput != null && !FunctionTypeUtils.isMultipleArgumentType(this.inputType)) { convertedInput = !convertedInput.equals(input) - ? new OriginalMessageHolder(convertedInput, (Message) input) - : convertedInput; + ? new OriginalMessageHolder(convertedInput, (Message) input) : convertedInput; } if (convertedInput != null && logger.isDebugEnabled()) { logger.debug("Converted Message: " + input + " to: " + convertedInput); @@ -853,16 +875,17 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect } /** - * This is an optional conversion which would only happen if `expected-content-type` is - * set as a header in a message or explicitly provided as part of the lookup. + * This is an optional conversion which would only happen if + * `expected-content-type` is set as a header in a message or explicitly provided + * as part of the lookup. */ private Object convertOutputIfNecessary(Object output, Type type, String[] contentType) { if (this.skipOutputConversion) { return output; } if (output instanceof Message && !this.containsRetainMessageSignalInHeaders((Message) output)) { - if (!FunctionTypeUtils.isMessage(type) || - (FunctionTypeUtils.isMessage(type) && Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(type)))) { + if (!FunctionTypeUtils.isMessage(type) || (FunctionTypeUtils.isMessage(type) + && Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(type)))) { output = ((Message) output).getPayload(); } } @@ -882,13 +905,16 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect convertedOutput = this.convertOutputPublisherIfNecessary((Publisher) output, type, contentType); } else if (output instanceof Message) { - convertedOutput = this.convertOutputMessageIfNecessary(output, ObjectUtils.isEmpty(contentType) ? null : contentType[0]); + convertedOutput = this.convertOutputMessageIfNecessary(output, + ObjectUtils.isEmpty(contentType) ? null : contentType[0]); } else if (output instanceof Collection && this.isOutputTypeMessage()) { - convertedOutput = this.convertMultipleOutputValuesIfNecessary(output, ObjectUtils.isEmpty(contentType) ? null : contentType); + convertedOutput = this.convertMultipleOutputValuesIfNecessary(output, + ObjectUtils.isEmpty(contentType) ? null : contentType); } else if (ObjectUtils.isArray(output) && !(output instanceof byte[])) { - convertedOutput = this.convertMultipleOutputValuesIfNecessary(output, ObjectUtils.isEmpty(contentType) ? null : contentType); + convertedOutput = this.convertMultipleOutputValuesIfNecessary(output, + ObjectUtils.isEmpty(contentType) ? null : contentType); } else { convertedOutput = messageConverter.toMessage(output, @@ -899,15 +925,16 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect } /** - * Will check if message contains any of the headers that are considered to serve as - * signals to retain output as Message (regardless of the output type of function). - * At this moment presence of 'scf-func-name' header or any header that begins with `lambda' - * (use by AWS) will result in this method returning true. + * Will check if message contains any of the headers that are considered to serve + * as signals to retain output as Message (regardless of the output type of + * function). At this moment presence of 'scf-func-name' header or any header that + * begins with `lambda' (use by AWS) will result in this method returning true. */ /* - * TODO we need to investigate if this could be extracted into some type of strategy since at - * the pure core level there is no case for this to ever be true. In fact today it is only AWS Lambda - * case that requires it since it may contain forwarding url + * TODO we need to investigate if this could be extracted into some type of + * strategy since at the pure core level there is no case for this to ever be + * true. In fact today it is only AWS Lambda case that requires it since it may + * contain forwarding url */ private boolean containsRetainMessageSignalInHeaders(Message message) { if (new CloudEventAttributes(message.getHeaders()).isValidCloudEvent()) { @@ -915,8 +942,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect } else { for (String headerName : message.getHeaders().keySet()) { - if (headerName.startsWith("lambda") || - headerName.startsWith("scf-func-name")) { + if (headerName.startsWith("lambda") || headerName.startsWith("scf-func-name")) { return true; } } @@ -941,8 +967,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect convertedInput = SimpleFunctionRegistry.this.jsonMapper.fromJson(input, inputType); } } - else if (SimpleFunctionRegistry.this.conversionService != null - && !rawInputType.equals(input.getClass()) + else if (SimpleFunctionRegistry.this.conversionService != null && !rawInputType.equals(input.getClass()) && SimpleFunctionRegistry.this.conversionService.canConvert(input.getClass(), rawInputType)) { convertedInput = SimpleFunctionRegistry.this.conversionService.convert(input, rawInputType); } @@ -956,10 +981,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect * */ private boolean isWrapConvertedInputInMessage(Object convertedInput) { - return this.inputType != null - && FunctionTypeUtils.isMessage(this.inputType) - && !(convertedInput instanceof Message) - && !(convertedInput instanceof Publisher) + return this.inputType != null && FunctionTypeUtils.isMessage(this.inputType) + && !(convertedInput instanceof Message) && !(convertedInput instanceof Publisher) && !(convertedInput instanceof OriginalMessageHolder); } @@ -967,7 +990,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect * */ private Type extractActualValueTypeIfNecessary(Type type) { - if (type instanceof ParameterizedType && (FunctionTypeUtils.isPublisher(type) || FunctionTypeUtils.isMessage(type))) { + if (type instanceof ParameterizedType + && (FunctionTypeUtils.isPublisher(type) || FunctionTypeUtils.isMessage(type))) { return FunctionTypeUtils.getGenericType(type); } return type; @@ -1008,10 +1032,12 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect if (this.isInputTypeMessage()) { if (convertedInput == null) { /* - * In the event conversion was unsuccessful we simply return the original un-converted message. - * This will help to deal with issues like KafkaNull and others. However if this was not the intention - * of the developer, this would be discovered early in the development process where the - * additional message converter could be added to facilitate the conversion. + * In the event conversion was unsuccessful we simply return the + * original un-converted message. This will help to deal with issues + * like KafkaNull and others. However if this was not the intention of + * the developer, this would be discovered early in the development + * process where the additional message converter could be added to + * facilitate the conversion. */ logger.info("Input type conversion of payload " + message.getPayload() + " resulted in 'null'. " + "Will use the original message as input."); @@ -1019,7 +1045,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect } else { if (!(convertedInput instanceof Message)) { - convertedInput = MessageBuilder.withPayload(convertedInput).copyHeaders(message.getHeaders()).build(); + convertedInput = MessageBuilder.withPayload(convertedInput).copyHeaders(message.getHeaders()) + .build(); } } } @@ -1034,10 +1061,10 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect Object[] multipleValueArguments = this.parseMultipleValueArguments(output, outputTypes.length); Object[] convertedOutputs = new Object[outputTypes.length]; for (int i = 0; i < multipleValueArguments.length; i++) { - String[] ctToUse = !ObjectUtils.isEmpty(contentType) - ? new String[]{contentType[i]} - : new String[] {"application/json"}; - Object convertedInput = this.convertOutputIfNecessary(multipleValueArguments[i], outputTypes[i], ctToUse); + String[] ctToUse = !ObjectUtils.isEmpty(contentType) ? new String[] { contentType[i] } + : new String[] { "application/json" }; + Object convertedInput = this.convertOutputIfNecessary(multipleValueArguments[i], outputTypes[i], + ctToUse); convertedOutputs[i] = convertedInput; } return Tuples.fromArray(convertedOutputs); @@ -1050,15 +1077,18 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect private Object convertOutputMessageIfNecessary(Object output, String expectedOutputContetntType) { Map headersMap = (Map) ReflectionUtils .getField(SimpleFunctionRegistry.this.headersField, ((Message) output).getHeaders()); - String contentType = ((Message) output).getHeaders().containsKey(FunctionProperties.EXPECT_CONTENT_TYPE_HEADER) - ? (String) ((Message) output).getHeaders().get(FunctionProperties.EXPECT_CONTENT_TYPE_HEADER) + String contentType = ((Message) output).getHeaders() + .containsKey(FunctionProperties.EXPECT_CONTENT_TYPE_HEADER) + ? (String) ((Message) output).getHeaders() + .get(FunctionProperties.EXPECT_CONTENT_TYPE_HEADER) : expectedOutputContetntType; if (StringUtils.hasText(contentType)) { String[] expectedContentTypes = StringUtils.delimitedListToStringArray(contentType, ","); for (String expectedContentType : expectedContentTypes) { headersMap.put(MessageHeaders.CONTENT_TYPE, expectedContentType); - Object result = messageConverter.toMessage(((Message) output).getPayload(), ((Message) output).getHeaders()); + Object result = messageConverter.toMessage(((Message) output).getPayload(), + ((Message) output).getHeaders()); if (result != null) { return result; } @@ -1072,9 +1102,12 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect */ @SuppressWarnings("unchecked") private Object convertMultipleOutputValuesIfNecessary(Object output, String[] contentType) { - Collection outputCollection = ObjectUtils.isArray(output) ? CollectionUtils.arrayToList(output) : (Collection) output; - Collection convertedOutputCollection = outputCollection instanceof List ? new ArrayList<>() : new TreeSet<>(); - Type type = this.isOutputTypeMessage() ? FunctionTypeUtils.getGenericType(this.outputType) : this.outputType; + Collection outputCollection = ObjectUtils.isArray(output) ? CollectionUtils.arrayToList(output) + : (Collection) output; + Collection convertedOutputCollection = outputCollection instanceof List ? new ArrayList<>() + : new TreeSet<>(); + Type type = this.isOutputTypeMessage() ? FunctionTypeUtils.getGenericType(this.outputType) + : this.outputType; for (Object outToConvert : outputCollection) { Object result = this.convertOutputIfNecessary(outToConvert, type, contentType); Assert.notNull(result, () -> "Failed to convert output '" + outToConvert + "'"); @@ -1106,19 +1139,22 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect * */ @SuppressWarnings("unchecked") - private Object convertOutputPublisherIfNecessary(Publisher publisher, Type type, String[] expectedOutputContentType) { + private Object convertOutputPublisherIfNecessary(Publisher publisher, Type type, + String[] expectedOutputContentType) { return publisher instanceof Mono ? Mono.from(publisher).map(v -> this.convertOutputIfNecessary(v, type, expectedOutputContentType)) .doOnError(ex -> logger.error("Failed to convert output", (Throwable) ex)) : Flux.from(publisher).map(v -> this.convertOutputIfNecessary(v, type, expectedOutputContentType)) .doOnError(ex -> logger.error("Failed to convert output", (Throwable) ex)); } + } /** * */ - private static final class OriginalMessageHolder { + private static final class OriginalMessageHolder { + private final Object value; private final Message originalMessage; @@ -1135,5 +1171,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect public Message getOriginalMessage() { return this.originalMessage; } + } + } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventAutoConfiguration.java new file mode 100644 index 000000000..46a4e7dd8 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventAutoConfiguration.java @@ -0,0 +1,34 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.function.context.config; + +import org.springframework.cloud.function.cloudevent.CloudEventOutputMessageHeaderEnricher; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author Dave Syer + * + */ +@Configuration(proxyBeanMethods = false) +public class CloudEventAutoConfiguration { + + @Bean + public CloudEventOutputMessageHeaderEnricher cloudEventOutputMessageHeaderEnricher() { + return new CloudEventOutputMessageHeaderEnricher(); + } + +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/CompositeOutputMessageHeaderEnricher.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/CompositeOutputMessageHeaderEnricher.java new file mode 100644 index 000000000..8120d612b --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/CompositeOutputMessageHeaderEnricher.java @@ -0,0 +1,44 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.function.context.message; + +import java.util.List; + +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +/** + * @author Dave Syer + * + */ +public class CompositeOutputMessageHeaderEnricher implements OutputMessageHeaderEnricher { + + private final List delegates; + + public CompositeOutputMessageHeaderEnricher(List delegates) { + this.delegates = delegates; + } + + @Override + public Message enrich(Message output) { + Message result = MessageBuilder.withPayload(output.getPayload()).copyHeaders(output.getHeaders()).build(); + for (OutputMessageHeaderEnricher enricher : delegates) { + result = enricher.enrich(result); + } + return result; + } + +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/OutputMessageHeaderEnricher.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/OutputMessageHeaderEnricher.java new file mode 100644 index 000000000..febd6dd61 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/OutputMessageHeaderEnricher.java @@ -0,0 +1,28 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.function.context.message; + +import org.springframework.messaging.Message; + +/** + * @author Dave Syer + * + */ +public interface OutputMessageHeaderEnricher { + + Message enrich(Message output); + +} diff --git a/spring-cloud-function-context/src/main/resources/META-INF/spring.factories b/spring-cloud-function-context/src/main/resources/META-INF/spring.factories index 1dbbeee8d..11e477a54 100644 --- a/spring-cloud-function-context/src/main/resources/META-INF/spring.factories +++ b/spring-cloud-function-context/src/main/resources/META-INF/spring.factories @@ -1,5 +1,6 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ -org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration +org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration,\ +org.springframework.cloud.function.context.config.CloudEventAutoConfiguration org.springframework.cloud.function.context.WrapperDetector=\ org.springframework.cloud.function.context.config.FluxWrapperDetector org.springframework.context.ApplicationContextInitializer=\