diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionCatalog.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionCatalog.java index 53f330396..b39788941 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionCatalog.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionCatalog.java @@ -17,6 +17,11 @@ package org.springframework.cloud.function.context; import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import javax.activation.MimeType; /** @@ -27,15 +32,31 @@ public interface FunctionCatalog { /** - * Will look up the instance of the functional interface by name only and - * acceptedOutputTypes. + * Will look up the instance of the functional interface by name only. + * This lookup method assumes a very specific semantics which are: function sub-type(s) + * expected to be {@code Message}.
+ * For example, + *

+ * {@code Function, Message>} or + *
+ * {@code Function>, Flux>>} or + *
+ * {@code Consumer>>} etc. . . + *

+ * The {@code acceptedOutputMimeTypes} are the string representation of {@link MimeType} where each + * mime-type in the provided array would correspond to the output with the same index + * (for cases of functions with multiple outputs) and is used to convert such output back + * to {@code Message}. + * If you need to provide several accepted types per specific output you can simply delimit + * them with comma (e.g., {@code application/json,text/plain...}). * - * @param instance type - * @param functionDefinition functionDefinition - * @param acceptedOutputTypes acceptedOutputTypes + * @param instance type which should be one of {@link Supplier}, {@link Function} or {@link Consumer}. + * @param functionDefinition the definition of a function (e.g., 'foo' or 'foo|bar') + * @param acceptedOutputMimeTypes acceptedOutputMimeTypes array of string representation of {@link MimeType}s + * used to convert function output back to {@code Message}. * @return instance of the functional interface registered with this catalog */ - default T lookup(String functionDefinition, String... acceptedOutputTypes) { + default T lookup(String functionDefinition, String... acceptedOutputMimeTypes) { throw new UnsupportedOperationException("This instance of FunctionCatalog does not support this operation"); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java index 916920a65..f033ee7d9 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java @@ -42,7 +42,6 @@ import reactor.util.function.Tuples; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; -import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; @@ -82,7 +81,7 @@ import org.springframework.util.StringUtils; * @since 3.0 */ public class BeanFactoryAwareFunctionRegistry - implements FunctionRegistry, FunctionInspector, ApplicationContextAware, SmartInitializingSingleton { + implements FunctionRegistry, FunctionInspector, ApplicationContextAware { private static Log logger = LogFactory.getLog(AbstractSpringFunctionAdapterInitializer.class); @@ -121,20 +120,6 @@ public class BeanFactoryAwareFunctionRegistry return (T) this.compose(null, definition, acceptedOutputTypes); } - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - //TODO do we really need to do that given we no longer do the same for other gunctions? - public void afterSingletonsInstantiated() { - Map beansOfType = this.applicationContext - .getBeansOfType(FunctionRegistration.class); - for (FunctionRegistration fr : beansOfType.values()) { - this.registrationsByFunction.putIfAbsent(fr.getTarget(), fr); - for (Object name : fr.getNames()) { - this.registrationsByName.putIfAbsent((String) name, fr); - } - } - } - @SuppressWarnings("unchecked") @Override public Set getNames(Class type) { @@ -315,8 +300,6 @@ public class BeanFactoryAwareFunctionRegistry this.functionDefinition = functionDefinition; } - - @Override public void accept(Object input) { this.doApply(input, true); @@ -372,68 +355,51 @@ public class BeanFactoryAwareFunctionRegistry logger.debug("Applying function: " + this.functionDefinition); } - Object result = null; + Object result; if (input instanceof Publisher) { input = this.composed ? input : - this.convertInputPublisherIfNecessary((Publisher) input, FunctionTypeUtils.getInputType(functionType, 0)); - if (FunctionTypeUtils.isReactive(FunctionTypeUtils.getInputType(functionType, 0))) { + this.convertInputPublisherIfNecessary((Publisher) input, FunctionTypeUtils.getInputType(this.functionType, 0)); + if (FunctionTypeUtils.isReactive(FunctionTypeUtils.getInputType(this.functionType, 0))) { result = this.invokeFunction(input); - if (result == null) { - result = Mono.empty(); - } + result = result == null ? Mono.empty() : result; } else { if (this.composed) { return input instanceof Mono - ? Mono.from((Publisher) input).transform((Function) target) - : Flux.from((Publisher) input).transform((Function) target); + ? Mono.from((Publisher) input).transform((Function) this.target) + : Flux.from((Publisher) input).transform((Function) this.target); } else { - boolean isConsumer = FunctionTypeUtils.isConsumer(functionType); - Publisher res; - if (isConsumer) { - res = input instanceof Mono ? Mono.from((Publisher) input).doOnNext((Consumer) this.target).then() + if (FunctionTypeUtils.isConsumer(functionType)) { + result = input instanceof Mono ? Mono.from((Publisher) input).doOnNext((Consumer) this.target).then() : Flux.from((Publisher) input).doOnNext((Consumer) this.target).then(); } else { - res = input instanceof Mono + result = input instanceof Mono ? Mono.from((Publisher) input).map(value -> this.invokeFunction(value)) : Flux.from((Publisher) input).map(value -> this.invokeFunction(value)); } - return res; } } } else { - Type type = FunctionTypeUtils.getInputType(functionType, 0); - if (!composed && !FunctionTypeUtils.isMultipleInputArguments(functionType) && FunctionTypeUtils.isReactive(type)) { + Type type = FunctionTypeUtils.getInputType(this.functionType, 0); + if (!this.composed && !FunctionTypeUtils.isMultipleInputArguments(this.functionType) && FunctionTypeUtils.isReactive(type)) { Publisher publisher = FunctionTypeUtils.isFlux(type) ? input == null ? Flux.empty() : Flux.just(input) : input == null ? Mono.empty() : Mono.just(input); - publisher = this.convertInputPublisherIfNecessary(publisher, FunctionTypeUtils.getInputType(functionType, 0)); - result = this.invokeFunction(publisher); + result = this.invokeFunction(this.convertInputPublisherIfNecessary(publisher, FunctionTypeUtils.getInputType(this.functionType, 0))); } else { result = this.invokeFunction(this.composed ? input - : this.convertInputValueIfNecessary(input, FunctionTypeUtils.getInputType(functionType, 0))); + : this.convertInputValueIfNecessary(input, FunctionTypeUtils.getInputType(this.functionType, 0))); } } - if (!ObjectUtils.isEmpty(acceptedOutputMimeTypes)) { - if (result instanceof Publisher) { - result = this.convertOutputPublisherIfNecessary((Publisher) result, this.acceptedOutputMimeTypes); - } - else { - result = this.convertOutputValueIfNecessary(result, this.acceptedOutputMimeTypes); - } - } - - if (!(result instanceof Publisher) && (!(target instanceof FunctionInvocationWrapper) && target instanceof Supplier)) { - /* - * This is ONLY relevant for web, so consider exposing some property or may be - * the fact that this is a rare case (Supplier) leave it temporarily as is. - */ -// return Flux.just(result); + if (!ObjectUtils.isEmpty(this.acceptedOutputMimeTypes)) { + result = result instanceof Publisher + ? this.convertOutputPublisherIfNecessary((Publisher) result, this.acceptedOutputMimeTypes) + : this.convertOutputValueIfNecessary(result, this.acceptedOutputMimeTypes); } return result; @@ -443,18 +409,14 @@ public class BeanFactoryAwareFunctionRegistry logger.info("Converting output value "); Object convertedValue = null; if (FunctionTypeUtils.isMultipleArgumentsHolder(value)) { - int outputCount = FunctionTypeUtils.getOutputCount(functionType); + int outputCount = FunctionTypeUtils.getOutputCount(this.functionType); Object[] convertedInputArray = new Object[outputCount]; for (int i = 0; i < outputCount; i++) { Expression parsed = new SpelExpressionParser().parseExpression("getT" + (i + 1) + "()"); Object outputArgument = parsed.getValue(value); - if (outputArgument instanceof Publisher) { - outputArgument = this.convertOutputPublisherIfNecessary((Publisher) outputArgument, acceptedOutputMimeTypes[i]); - } - else { - outputArgument = this.convertOutputValueIfNecessary(outputArgument, acceptedOutputMimeTypes); - } - convertedInputArray[i] = outputArgument; + convertedInputArray[i] = outputArgument instanceof Publisher + ? this.convertOutputPublisherIfNecessary((Publisher) outputArgument, acceptedOutputMimeTypes[i]) + : this.convertOutputValueIfNecessary(outputArgument, acceptedOutputMimeTypes); } convertedValue = Tuples.fromArray(convertedInputArray); } @@ -472,7 +434,6 @@ public class BeanFactoryAwareFunctionRegistry // ignore } } - } return convertedValue; diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java index 15f0e8f79..b07120ccf 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java @@ -17,6 +17,7 @@ package org.springframework.cloud.function.context.catalog; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.function.Consumer; @@ -37,6 +38,9 @@ import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageBuilder; import static org.assertj.core.api.Assertions.assertThat; @@ -65,6 +69,13 @@ public class BeanFactoryAwareFunctionRegistryTests { List result = asFlux.apply(Flux.just("uppercaseFlux", "uppercaseFlux2")).collectList().block(); assertThat(result.get(0)).isEqualTo("UPPERCASEFLUX"); assertThat(result.get(1)).isEqualTo("UPPERCASEFLUX2"); + + Function>, Flux>> messageFlux = catalog.lookup("uppercase", "application/json"); + Message message1 = MessageBuilder.withPayload("\"uppercaseFlux\"".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build(); + Message message2 = MessageBuilder.withPayload("\"uppercaseFlux2\"".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build(); + List> messageResult = messageFlux.apply(Flux.just(message1, message2)).collectList().block(); + assertThat(messageResult.get(0).getPayload()).isEqualTo("\"UPPERCASEFLUX\"".getBytes(StandardCharsets.UTF_8)); + assertThat(messageResult.get(1).getPayload()).isEqualTo("\"UPPERCASEFLUX2\"".getBytes(StandardCharsets.UTF_8)); } /*