diff --git a/docs/src/main/asciidoc/spring-cloud-function.adoc b/docs/src/main/asciidoc/spring-cloud-function.adoc index 9ee24268a..e3546c2c3 100644 --- a/docs/src/main/asciidoc/spring-cloud-function.adoc +++ b/docs/src/main/asciidoc/spring-cloud-function.adoc @@ -123,30 +123,49 @@ you to invoke a single function which acts as a router to an actual function you This feature is very useful in certain FAAS environments where maintaining configurations for several functions could be cumbersome or exposing more then one function is not possible. -You enable this feature via `spring.cloud.function.routing.enabled` property setting it -to `true` (default is `false`). -This enables `RoutingFunction` under the name `router` which is loaded in FunctionCatalog. +The `RoutingFunction` is registered in _FunctionCatalog_ under the name `functionRouter`. For simplicity +and consistency you can also refer to `RoutingFunction.FUNCTION_NAME` constant. This function has the following signature: [source, java] ---- -public class RoutingFunction implements Function>, Publisher>, Consumer>> { +public class RoutingFunction implements Function { . . . } ---- +The routing instructions could be communicated in several ways; -This allows the above function to act as both `Function` and `Consumer`. -As you can see it takes `Message` as an input argument. This allows you to communicate -the name of the actual function you want to invoke by providing `function.name` Message header. +*Message Headers* -In specific execution environments/models the adapters are responsible to translate and communicate `function.name` -via Message header. For example, when using _spring-cloud-function-web_ you can provide `function.name` as an HTTP +If the input argument is of type `Message`, you can communicate routing instruction by setting one of +`spring.cloud.function.definition` or `spring.cloud.function.routing-expression` Message headers. +For more static cases you can use `spring.cloud.function.definition` header which allows you to provide +the name of a single function (e.g., `...definition=foo`) or a composition instruction (e.g., `...definition=foo|bar|baz`). +For more dynamic cases you can use `spring.cloud.function.routing-expression` header which allows +you to use Spring Expression Language (SpEL) and provide SpEL expression that should resolve +into definition of a function (as described above). + +NOTE: SpEL evaluation context's root object is the +actual input argument, so in he case of `Message` you can construct expression that has access +to both `payload` and `headers` (e.g., `spring.cloud.function.routing-expression=headers.function_name`). + +In specific execution environments/models the adapters are responsible to translate and communicate +`spring.cloud.function.definition` and/or `spring.cloud.function.routing-expression` via Message header. +For example, when using _spring-cloud-function-web_ you can provide `spring.cloud.function.definition` as an HTTP header and the framework will propagate it as well as other HTTP headers as Message headers. -Using Message also allows us to benefit from `MessageConverter`s to convert incoming request to the actual input type -of the target function +*Application Properties* +Routing instruction can also be communicated via `spring.cloud.function.definition` +or `spring.cloud.function.routing-expression` as application properties. The rules described in the +previous section apply here as well. The only difference is you provide these instructions as +application properties (e.g., `--spring.cloud.function.definition=foo`). + +IMPORTANT: When dealing with reactive inputs (e.g., Publisher), routing instructions must only be provided via Function properties. This is +due to the nature of the reactive functions which are invoked only once to pass a Publisher and the rest +is handled by the reactor, hence we can not access and/or rely on the routing instructions communicated via individual +values (e.g., Message). === Kotlin Lambda support diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java new file mode 100644 index 000000000..3379a6aaa --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java @@ -0,0 +1,62 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * + * @author Oleg Zhurakousky + * @since 3.0 + * + */ +@ConfigurationProperties(prefix = FunctionProperties.PREFIX) +public class FunctionProperties { + + /** + * The name prefix for properties defined by this properties class. + */ + public final static String PREFIX = "spring.cloud.function"; + + /** + * Definition of the function to be used. This could be function name (e.g., 'myFunction') + * or function composition definition (e.g., 'myFunction|yourFunction') + */ + private String definition; + + /** + * SpEL expression which should result in function definition (e.g., function name or composition instruction). + * NOTE: SpEL evaluation context's root object is the input argument (e.g., Message). + */ + private String routingExpression; + + public String getDefinition() { + return definition; + } + + public void setDefinition(String definition) { + this.definition = definition; + } + + public String getRoutingExpression() { + return routingExpression; + } + + public void setRoutingExpression(String routingExpression) { + this.routingExpression = routingExpression; + } +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java index 49670dff9..3708c0c4d 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java @@ -54,6 +54,7 @@ import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionRegistry; import org.springframework.cloud.function.context.FunctionType; import org.springframework.cloud.function.context.config.FunctionContextUtils; +import org.springframework.cloud.function.context.config.RoutingFunction; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ConfigurableApplicationContext; @@ -213,9 +214,13 @@ public class BeanFactoryAwareFunctionRegistry private String discoverDefaultDefinitionIfNecessary(String definition) { if (StringUtils.isEmpty(definition)) { - String[] functionNames = Stream.of(this.applicationContext.getBeanNamesForType(Function.class)).filter(n -> !n.startsWith("_")).toArray(String[]::new); - String[] consumerNames = Stream.of(this.applicationContext.getBeanNamesForType(Consumer.class)).filter(n -> !n.startsWith("_")).toArray(String[]::new); - String[] supplierNames = Stream.of(this.applicationContext.getBeanNamesForType(Supplier.class)).filter(n -> !n.startsWith("_")).toArray(String[]::new); + // the underscores are for Kotlin function registrations (see KotlinLambdaToFunctionAutoConfiguration) + String[] functionNames = Stream.of(this.applicationContext.getBeanNamesForType(Function.class)) + .filter(n -> !n.startsWith("_") && !n.equals(RoutingFunction.FUNCTION_NAME)).toArray(String[]::new); + String[] consumerNames = Stream.of(this.applicationContext.getBeanNamesForType(Consumer.class)) + .filter(n -> !n.startsWith("_") && !n.equals(RoutingFunction.FUNCTION_NAME)).toArray(String[]::new); + String[] supplierNames = Stream.of(this.applicationContext.getBeanNamesForType(Supplier.class)) + .filter(n -> !n.startsWith("_") && !n.equals(RoutingFunction.FUNCTION_NAME)).toArray(String[]::new); /* * we may need to add BiFunction and BiConsumer at some point */ @@ -454,15 +459,11 @@ public class BeanFactoryAwareFunctionRegistry } public boolean isConsumer() { - return this.target instanceof Consumer; - } - - public boolean isFunction() { - return this.target instanceof Function; + return FunctionTypeUtils.isConsumer(this.functionType); } public boolean isSupplier() { - return this.target instanceof Supplier; + return FunctionTypeUtils.isSupplier(this.functionType); } public Object getTarget() { @@ -528,6 +529,11 @@ public class BeanFactoryAwareFunctionRegistry Publisher publisher = FunctionTypeUtils.isFlux(type) ? input == null ? Flux.empty() : Flux.just(input) : input == null ? Mono.empty() : Mono.just(input); + if (logger.isDebugEnabled()) { + logger.debug("Invoking reactive function '" + this.functionType + "' with non-reactive input " + + "should at least assume reactive output (e.g., Function> f3 = catalog.lookup(\"echoFlux\");), " + + "otherwise invocation will result in ClassCastException."); + } result = this.invokeFunction(this.convertInputPublisherIfNecessary(publisher, FunctionTypeUtils.getInputType(this.functionType, 0))); } else { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionInspector.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionInspector.java index 2aec31085..9fdcf39a8 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionInspector.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionInspector.java @@ -20,10 +20,12 @@ import java.util.Collections; import java.util.Set; import org.springframework.cloud.function.context.FunctionRegistration; +import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper; +import org.springframework.cloud.function.context.config.RoutingFunction; /** * @author Dave Syer - * + * @author Oleg Zhurakousky */ public interface FunctionInspector { @@ -31,6 +33,11 @@ public interface FunctionInspector { default boolean isMessage(Object function) { FunctionRegistration registration = getRegistration(function); + if (registration != null && registration.getTarget() instanceof FunctionInvocationWrapper + && ((FunctionInvocationWrapper) registration.getTarget()).getTarget() instanceof RoutingFunction) { + // we always want to give routing function as much information as possible + return true; + } return registration == null ? false : registration.getType().isMessage(); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index 0d59c7628..92130c7be 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -17,7 +17,6 @@ package org.springframework.cloud.function.context.config; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.function.Consumer; @@ -32,7 +31,9 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.function.context.FunctionCatalog; +import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionRegistry; import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry; import org.springframework.cloud.function.context.catalog.FunctionInspector; @@ -63,6 +64,7 @@ import org.springframework.util.CollectionUtils; */ @Configuration(proxyBeanMethods = false) @ConditionalOnMissingBean(FunctionCatalog.class) +@EnableConfigurationProperties(FunctionProperties.class) public class ContextFunctionCatalogAutoConfiguration { static final String PREFERRED_MAPPER_PROPERTY = "spring.http.converters.preferred-json-mapper"; @@ -95,14 +97,8 @@ public class ContextFunctionCatalogAutoConfiguration { } @Bean(RoutingFunction.FUNCTION_NAME) - @ConditionalOnProperty(name = "spring.cloud.function.routing.enabled", havingValue = "true") - RoutingFunction gateway(FunctionCatalog functionCatalog, FunctionInspector functionInspector) { - Collection messageConverters = new ArrayList(); - messageConverters.add(new MappingJackson2MessageConverter()); - messageConverters.add(new StringMessageConverter()); - messageConverters.add(new ByteArrayMessageConverter()); - CompositeMessageConverter messageConverter = new CompositeMessageConverter(messageConverters); - return new RoutingFunction(functionCatalog, functionInspector, messageConverter); + RoutingFunction functionRouter(FunctionCatalog functionCatalog, FunctionInspector functionInspector, FunctionProperties functionProperties) { + return new RoutingFunction(functionCatalog, functionInspector, functionProperties); } @Configuration(proxyBeanMethods = false) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java index e80c6126a..b781346f8 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java @@ -16,98 +16,168 @@ package org.springframework.cloud.function.context.config; +import java.lang.reflect.Type; import java.util.function.Function; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; -import reactor.core.publisher.SignalType; +import reactor.core.publisher.Mono; import org.springframework.cloud.function.context.FunctionCatalog; +import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.catalog.FunctionInspector; +import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; +import org.springframework.context.expression.MapAccessor; +import org.springframework.expression.Expression; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.converter.MessageConverter; -import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + /** * An implementation of Function which acts as a gateway/router by actually - * delegating incoming invocation to a function specified via `function.name` - * message header.
- * {@link Message} is used as a canonical representation of a request which - * contains some metadata and it is the responsibility of the higher level - * framework to convert the incoming request into a Message. For example; - * spring-cloud-function-web will create Message from HttpRequest copying all - * HTTP headers as message headers. + * delegating incoming invocation to a function specified .. . * * @author Oleg Zhurakousky * @since 2.1 * */ -public class RoutingFunction implements Function>, Publisher> { +public class RoutingFunction implements Function { /** * The name of this function use by BeanFactory. */ - public static final String FUNCTION_NAME = "router"; + public static final String FUNCTION_NAME = "functionRouter"; + + private static Log logger = LogFactory.getLog(RoutingFunction.class); + + private final StandardEvaluationContext evalContext = new StandardEvaluationContext(); + + private final SpelExpressionParser spelParser = new SpelExpressionParser(); private final FunctionCatalog functionCatalog; + private final FunctionProperties functionProperties; + private final FunctionInspector functionInspector; - private final MessageConverter messageConverter; - - RoutingFunction(FunctionCatalog functionCatalog, FunctionInspector functionInspector, - MessageConverter messageConverter) { + public RoutingFunction(FunctionCatalog functionCatalog, FunctionInspector functionInspector, FunctionProperties functionProperties) { this.functionCatalog = functionCatalog; + this.functionProperties = functionProperties; this.functionInspector = functionInspector; - this.messageConverter = messageConverter; + this.evalContext.addPropertyAccessor(new MapAccessor()); } - @SuppressWarnings("unchecked") @Override - public Publisher apply(Publisher> input) { - return Flux.from(input) - .switchOnFirst((signal, flux) -> { - Assert.isTrue(signal.hasValue() - && signal.getType() == SignalType.ON_NEXT, "Signal has no value or wrong type " + signal); - Function, Publisher> function = this.getRouteToFunction(signal.get()); - return flux.map(message -> { - Object inputValue = this.convertInput(message, function); - return inputValue; - }) - .log() - .doOnError(error -> { - throw new IllegalStateException("Failed to convert Message. Possible reason; " - + "No suitable converter was found for payload with 'contentType' " - + signal.get().getHeaders().get(MessageHeaders.CONTENT_TYPE), error); - }) - .transform(function); - }); + public Object apply(Object input) { + return this.route(input, input instanceof Publisher); + } + + /* + * - Check if function-name is set in header and if it is use it. + * If NOT + * - Check routing-expression and if it is set use it + * If NOT + * - Check function-name is set in FunctionProperties and if it is use it + * If NOT + * - Fail + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + private Object route(Object input, boolean originalInputIsPublisher) { + Function function; + if (input instanceof Message) { + Message message = (Message) input; + if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.definition"))) { + function = functionFromDefinition((String) message.getHeaders().get("spring.cloud.function.definition")); + Type functionType = functionInspector.getRegistration(function).getType().getType(); + if (FunctionTypeUtils.isReactive(FunctionTypeUtils.getInputType(functionType, 0))) { + this.assertOriginalInputIsNotPublisher(originalInputIsPublisher); + } + } + else if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.routing-expression"))) { + function = this.functionFromExpression((String) message.getHeaders().get("spring.cloud.function.routing-expression"), message); + Type functionType = functionInspector.getRegistration(function).getType().getType(); + if (FunctionTypeUtils.isReactive(FunctionTypeUtils.getInputType(functionType, 0))) { + this.assertOriginalInputIsNotPublisher(originalInputIsPublisher); + } + } + else if (StringUtils.hasText(functionProperties.getRoutingExpression())) { + function = this.functionFromExpression(functionProperties.getRoutingExpression(), message); + } + else if (StringUtils.hasText(functionProperties.getDefinition())) { + function = functionFromDefinition(functionProperties.getDefinition()); + } + else { + throw new IllegalStateException("Failed to establish route, since neither were provided: " + + "'spring.cloud.function.definition' as Message header or as application property or " + + "'spring.cloud.function.routing-expression' as application property."); + } + } + else if (input instanceof Publisher) { + if (StringUtils.hasText(functionProperties.getRoutingExpression())) { + function = this.functionFromExpression(functionProperties.getRoutingExpression(), input); + } + else + if (StringUtils.hasText(functionProperties.getDefinition())) { + function = functionFromDefinition(functionProperties.getDefinition()); + } + else { + return input instanceof Mono + ? Mono.from((Publisher) input).map(v -> route(v, originalInputIsPublisher)) + : Flux.from((Publisher) input).map(v -> route(v, originalInputIsPublisher)); + } + } + else { + this.assertOriginalInputIsNotPublisher(originalInputIsPublisher); + if (StringUtils.hasText(functionProperties.getRoutingExpression())) { + function = this.functionFromExpression(functionProperties.getRoutingExpression(), input); + } + else + if (StringUtils.hasText(functionProperties.getDefinition())) { + function = functionFromDefinition(functionProperties.getDefinition()); + } + else { + throw new IllegalStateException("Failed to establish route, since neither were provided: " + + "'spring.cloud.function.definition' as Message header or as application property or " + + "'spring.cloud.function.routing-expression' as application property."); + } + } + + return function.apply(input); + } + + private void assertOriginalInputIsNotPublisher(boolean originalInputIsPublisher) { + Assert.isTrue(!originalInputIsPublisher, "Routing input of type Publisher is not supported per individual " + + "values (e.g., message header or POJO). Instead you should use 'spring.cloud.function.definition' or " + + "spring.cloud.function.routing-expression' as application properties."); } @SuppressWarnings("rawtypes") - private Function getRouteToFunction(Message message) { - String routeToFunctionName = (String) message.getHeaders().get("function.name"); - Assert.hasText(routeToFunctionName, "A 'function.name' was not provided as message header."); - Function function = functionCatalog.lookup(routeToFunctionName); - Assert.notNull(function, "Failed to locate function specified with 'function.name':" - + message.getHeaders().get("function.name")); + private Function functionFromDefinition(String definition) { + Function function = functionCatalog.lookup(definition); + Assert.notNull(function, "Failed to lookup function to route based on the value of 'spring.cloud.function.definition' property '" + + functionProperties.getDefinition() + "'"); + if (logger.isInfoEnabled()) { + logger.info("Resolved function from provided [definition] property " + functionProperties.getDefinition()); + } return function; } - private Object convertInput(Message message, Object function) { - Class inputType = functionInspector.getInputType(function); - Object inputValue = message.getPayload(); - if (!inputValue.getClass().isAssignableFrom(inputType)) { - inputValue = this.messageConverter.fromMessage(message, functionInspector.getInputType(function)); + @SuppressWarnings("rawtypes") + private Function functionFromExpression(String routingExpression, Object input) { + Expression expression = spelParser.parseExpression(routingExpression); + String functionName = expression.getValue(this.evalContext, input, String.class); + Assert.hasText(functionName, "Failed to resolve function name based on routing expression '" + functionProperties.getRoutingExpression() + "'"); + Function function = functionCatalog.lookup(functionName); + Assert.notNull(function, "Failed to lookup function to route to based on the expression '" + + functionProperties.getRoutingExpression() + "' whcih resolved to '" + functionName + "' function name."); + if (logger.isInfoEnabled()) { + logger.info("Resolved function from provided [routing-expression] " + routingExpression); } - if (this.functionInspector.isMessage(function)) { - inputValue = MessageBuilder.createMessage(inputValue, message.getHeaders()); - } - Assert.notNull(inputValue, "Failed to determine input value of type " - + inputType + " from Message '" - + message + "'. No suitable Message Converter found."); - return inputValue; + return function; } } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/RoutingFunctionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/RoutingFunctionTests.java new file mode 100644 index 000000000..ed7b758a6 --- /dev/null +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/RoutingFunctionTests.java @@ -0,0 +1,187 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.config; + +import java.util.function.Function; + +import org.junit.After; +import org.junit.Test; +import reactor.core.publisher.Flux; + +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.function.context.FunctionCatalog; +import org.springframework.cloud.function.context.FunctionProperties; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.fail; + +/** + * + * @author Oleg Zhurakousky + * + */ +public class RoutingFunctionTests { + + private ConfigurableApplicationContext context; + + @After + public void before() { + System.clearProperty("spring.cloud.function.definition"); + System.clearProperty("spring.cloud.function.routing-expression"); + context.close(); + } + + private FunctionCatalog configureCatalog() { + context = new SpringApplicationBuilder(RoutingFunctionConfiguration.class).run( + "--logging.level.org.springframework.cloud.function=DEBUG", + "--spring.cloud.function.routing.enabled=true"); + FunctionCatalog catalog = context.getBean(FunctionCatalog.class); + return catalog; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testInvocationWithMessageAndHeader() { + FunctionCatalog functionCatalog = this.configureCatalog(); + Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME); + assertThat(function).isNotNull(); + Message message = MessageBuilder.withPayload("hello") + .setHeader(FunctionProperties.PREFIX + ".definition", "reverse").build(); + assertThat(function.apply(message)).isEqualTo("olleh"); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testRoutingSimpleInputWithReactiveFunctionWithMessageHeader() { + FunctionCatalog functionCatalog = this.configureCatalog(); + Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME); + assertThat(function).isNotNull(); + Message message = MessageBuilder.withPayload("hello") + .setHeader(FunctionProperties.PREFIX + ".definition", "echoFlux").build(); + assertThat(((Flux) function.apply(message)).blockFirst()).isEqualTo("hello"); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test(expected = Exception.class) + public void testRoutingReactiveInputWithReactiveFunctionAndDefinitionMessageHeader() { + FunctionCatalog functionCatalog = this.configureCatalog(); + Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME); + assertThat(function).isNotNull(); + Message message = MessageBuilder.withPayload("hello") + .setHeader(FunctionProperties.PREFIX + ".definition", "echoFlux").build(); + Flux resultFlux = (Flux) function.apply(Flux.just(message)); + resultFlux.subscribe(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test(expected = Exception.class) + public void testRoutingReactiveInputWithReactiveFunctionAndExpressionMessageHeader() { + FunctionCatalog functionCatalog = this.configureCatalog(); + Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME); + assertThat(function).isNotNull(); + Message message = MessageBuilder.withPayload("hello") + .setHeader(FunctionProperties.PREFIX + ".routing-expression", "'echoFlux'").build(); + Flux resultFlux = (Flux) function.apply(Flux.just(message)); + resultFlux.subscribe(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testInvocationWithMessageAndDefinitionProperty() { + System.setProperty(FunctionProperties.PREFIX + ".definition", "reverse"); + FunctionCatalog functionCatalog = this.configureCatalog(); + Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME); + assertThat(function).isNotNull(); + Message message = MessageBuilder.withPayload("hello").build(); + assertThat(function.apply(message)).isEqualTo("olleh"); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testInvocationWithMessageAndRoutingExpression() { + System.setProperty(FunctionProperties.PREFIX + ".routing-expression", "headers.function_name"); + FunctionCatalog functionCatalog = this.configureCatalog(); + Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME); + assertThat(function).isNotNull(); + Message message = MessageBuilder.withPayload("hello").setHeader("function_name", "reverse").build(); + assertThat(function.apply(message)).isEqualTo("olleh"); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testOtherExpectedFailures() { + FunctionCatalog functionCatalog = this.configureCatalog(); + Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME); + // no function.definition header or function property + try { + function.apply(MessageBuilder.withPayload("hello").build()); + fail(); + } + catch (Exception e) { + //ignore + } + + // non existing function + try { + function.apply(MessageBuilder.withPayload("hello").setHeader(FunctionProperties.PREFIX + ".definition", "blah").build()); + fail(); + } + catch (Exception e) { + //ignore + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testInvocationWithMessageComposed() { + FunctionCatalog functionCatalog = this.configureCatalog(); + + Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME + "|uppercase"); + assertThat(function).isNotNull(); + + Message message = MessageBuilder.withPayload("hello") + .setHeader(FunctionProperties.PREFIX + ".definition", "uppercase").build(); + + assertThat(function.apply(message)).isEqualTo("HELLO"); + } + + @EnableAutoConfiguration + @Configuration + protected static class RoutingFunctionConfiguration { + + @Bean + public Function reverse() { + return v -> new StringBuilder(v).reverse().toString(); + } + + @Bean + public Function uppercase() { + return v -> v.toUpperCase(); + } + + @Bean + public Function, Flux> echoFlux() { + return f -> f; + } + } +} diff --git a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionArchiveDeployer.java b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionArchiveDeployer.java index 8d61211f2..4b7afd152 100644 --- a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionArchiveDeployer.java +++ b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionArchiveDeployer.java @@ -67,7 +67,7 @@ class FunctionArchiveDeployer extends JarLauncher { } @SuppressWarnings({ "unchecked", "rawtypes" }) - void deploy(FunctionRegistry functionRegistry, FunctionProperties functionProperties, String[] args) { + void deploy(FunctionRegistry functionRegistry, FunctionDeployerProperties functionProperties, String[] args) { ClassLoader currentLoader = Thread.currentThread().getContextClassLoader(); try { @@ -165,7 +165,7 @@ class FunctionArchiveDeployer extends JarLauncher { || name.startsWith("reactor."); } - private String discoverFunctionClassName(FunctionProperties functionProperties) { + private String discoverFunctionClassName(FunctionDeployerProperties functionProperties) { try { return StringUtils.hasText(functionProperties.getFunctionClass()) ? functionProperties.getFunctionClass() diff --git a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionDeployerConfiguration.java b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionDeployerConfiguration.java index 4634851c7..afb447550 100644 --- a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionDeployerConfiguration.java +++ b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionDeployerConfiguration.java @@ -33,11 +33,16 @@ import org.springframework.boot.env.EnvironmentPostProcessor; import org.springframework.boot.loader.archive.Archive; import org.springframework.boot.loader.archive.ExplodedArchive; import org.springframework.boot.loader.archive.JarFileArchive; +import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionRegistry; import org.springframework.context.SmartLifecycle; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.core.env.MapPropertySource; +import org.springframework.core.env.MutablePropertySources; +import org.springframework.core.env.PropertiesPropertySource; +import org.springframework.util.StringUtils; /** * @@ -52,13 +57,13 @@ import org.springframework.core.env.ConfigurableEnvironment; * */ @Configuration(proxyBeanMethods = false) -@EnableConfigurationProperties(FunctionProperties.class) +@EnableConfigurationProperties(FunctionDeployerProperties.class) public class FunctionDeployerConfiguration { private static Log logger = LogFactory.getLog(FunctionDeployerConfiguration.class); @Bean - SmartLifecycle functionArchiveDeployer(FunctionProperties functionProperties, + SmartLifecycle functionArchiveDeployer(FunctionDeployerProperties functionProperties, FunctionRegistry functionRegistry, ApplicationArguments arguments) { ApplicationArguments updatedArguments = this.updateArguments(arguments); @@ -123,14 +128,14 @@ public class FunctionDeployerConfiguration { } /* - * We need to update actual arguments to ensure that when we may be passing to the deployed archive has the right properties. - * For the current application FunctionProperties already set as a result of EnvironmentPostProcessor + * We need to update the actual arguments with non-legacy properties before passing these arguments to the deployable archive. + * For the current application FunctionProperties already updated and set as a result of EnvironmentPostProcessor */ private ApplicationArguments updateArguments(ApplicationArguments arguments) { List originalArguments = new ArrayList(Arrays.asList(arguments.getSourceArgs())); if (arguments.containsOption("function.name")) { - originalArguments.add(FunctionProperties.PREFIX + ".function-name=" + arguments.getOptionValues("function.name").get(0)); + originalArguments.add(FunctionProperties.PREFIX + ".definition=" + arguments.getOptionValues("function.name").get(0)); } if (arguments.containsOption("function.location")) { originalArguments.add(FunctionProperties.PREFIX + ".location=" + arguments.getOptionValues("function.location").get(0)); @@ -147,11 +152,16 @@ public class FunctionDeployerConfiguration { static class LegacyPropertyEnvironmentPostProcessor implements EnvironmentPostProcessor { @Override public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) { - if (environment.containsProperty("function.name")) { - System.setProperty(FunctionProperties.PREFIX + ".function-name", environment.getProperty("function.name")); - } - if (environment.containsProperty("function.location")) { - System.setProperty(FunctionProperties.PREFIX + ".location", environment.getProperty("function.location")); + String functionName = environment.containsProperty("function.name") ? environment.getProperty("function.name") : null; + String functionLocation = environment.containsProperty("function.location") ? environment.getProperty("function.location") : null; + if (StringUtils.hasText(functionName) || StringUtils.hasText(functionLocation)) { + MutablePropertySources propertySources = environment.getPropertySources(); + propertySources.forEach(ps -> { + if (ps instanceof PropertiesPropertySource) { + ((MapPropertySource) ps).getSource().put(FunctionProperties.PREFIX + ".definition", functionName); + ((MapPropertySource) ps).getSource().put(FunctionProperties.PREFIX + ".location", functionLocation); + } + }); } } } diff --git a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionProperties.java b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionDeployerProperties.java similarity index 70% rename from spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionProperties.java rename to spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionDeployerProperties.java index 74773abbd..fe694272a 100644 --- a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionProperties.java +++ b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionDeployerProperties.java @@ -19,22 +19,19 @@ package org.springframework.cloud.function.deployer; import javax.annotation.PostConstruct; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.util.Assert; -import org.springframework.util.StringUtils; /** * Configuration properties for deciding how to locate the functional class to execute. * * @author Eric Bottard * @author Oleg Zhurakousky + * + * @see FunctionProperties */ -@ConfigurationProperties("spring.cloud.function") -public class FunctionProperties { - - /** - * The name prefix for properties defined by this properties class. - */ - public final static String PREFIX = "spring.cloud.function"; +@ConfigurationProperties(prefix = FunctionProperties.PREFIX) +public class FunctionDeployerProperties { /** * Location of jar archive containing the supplier/function/consumer class or bean to run. @@ -42,12 +39,7 @@ public class FunctionProperties { private String location; /** - * The name of the function to be looked up from the FunctionCatalog (e.g., bean name). - */ - private String functionName; - - /** - * The name of the function class tyo be instantiated and loaded into FunctionCatalog. The name of the + * The name of the function class to be instantiated and loaded into FunctionCatalog. The name of the * function will be decapitalized simple name of this class. */ private String functionClass; @@ -60,14 +52,6 @@ public class FunctionProperties { return this.functionClass; } - public void setFunctionName(String functionName) { - this.functionName = StringUtils.hasText(functionName) ? functionName : ""; - } - - public String getFunctionName() { - return this.functionName; - } - public String getLocation() { return this.location; } diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/RoutingFunctionTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/RoutingFunctionTests.java index d61fc7b3d..171dc7a6d 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/RoutingFunctionTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/RoutingFunctionTests.java @@ -31,6 +31,7 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.config.RoutingFunction; import org.springframework.cloud.function.web.RestApplication; import org.springframework.cloud.function.web.mvc.RoutingFunctionTests.TestConfiguration; @@ -42,6 +43,7 @@ import org.springframework.http.RequestEntity; import org.springframework.http.ResponseEntity; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; @@ -57,18 +59,23 @@ import static org.assertj.core.api.Assertions.assertThat; "spring.cloud.function.web.path=/functions", "spring.cloud.function.routing.enabled=true"}) @ContextConfiguration(classes = { RestApplication.class, TestConfiguration.class }) -@Ignore public class RoutingFunctionTests { @Autowired private TestRestTemplate rest; + @Autowired + private FunctionProperties functionProperties; + + @Test + @DirtiesContext public void testFunctionMessage() throws Exception { + HttpEntity postForEntity = this.rest .exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME)) .contentType(MediaType.APPLICATION_JSON) - .header("function.name", "employee") + .header("spring.cloud.function.definition", "employee") .body("{\"name\":\"Bob\",\"age\":25}"), String.class); assertThat(postForEntity.getBody()).isEqualTo("{\"name\":\"Bob\",\"age\":25}"); assertThat(postForEntity.getHeaders().containsKey("x-content-type")).isTrue(); @@ -78,55 +85,58 @@ public class RoutingFunctionTests { } @Test + @DirtiesContext public void testFunctionPrimitive() throws Exception { ResponseEntity postForEntity = this.rest .exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME)) .contentType(MediaType.TEXT_PLAIN) - .header("function.name", "echo") + .header("spring.cloud.function.definition", "echo") .body("{\"name\":\"Bob\",\"age\":25}"), String.class); assertThat(postForEntity.getBody()).isEqualTo("{\"name\":\"Bob\",\"age\":25}"); assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK); } @Test + @DirtiesContext public void testFluxFunctionPrimitive() throws Exception { + this.functionProperties.setDefinition("fluxuppercase"); ResponseEntity postForEntity = this.rest .exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME)) .contentType(MediaType.TEXT_PLAIN) - .header("function.name", "fluxuppercase") - .body("hello"), String.class); - assertThat(postForEntity.getBody()).isEqualTo("[\"HELLO\"]"); + .body("[\"hello\", \"bye\"]"), String.class); + assertThat(postForEntity.getBody()).isEqualTo("[\"HELLO\", \"BYE\"]"); assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK); postForEntity = this.rest .exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME)) .contentType(MediaType.TEXT_PLAIN) - .header("function.name", "fluxuppercase") .body("hello1"), String.class); - assertThat(postForEntity.getBody()).isEqualTo("[\"HELLO1\"]"); + assertThat(postForEntity.getBody()).isEqualTo("HELLO1"); assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK); postForEntity = this.rest .exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME)) .contentType(MediaType.TEXT_PLAIN) - .header("function.name", "fluxuppercase") .body("hello2"), String.class); - assertThat(postForEntity.getBody()).isEqualTo("[\"HELLO2\"]"); + assertThat(postForEntity.getBody()).isEqualTo("HELLO2"); assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK); } @Test + @DirtiesContext public void testFluxFunctionPrimitiveArray() throws Exception { + this.functionProperties.setDefinition("fluxuppercase"); ResponseEntity postForEntity = this.rest .exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME)) .contentType(MediaType.APPLICATION_JSON) - .header("function.name", "fluxuppercase") .body(new String[] {"a", "b", "c"}), String.class); assertThat(postForEntity.getBody()).isEqualTo("[\"A\",\"B\",\"C\"]"); assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK); } @Test + @DirtiesContext + @Ignore public void testFluxConsumer() throws Exception { ResponseEntity postForEntity = this.rest .exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME)) @@ -134,10 +144,13 @@ public class RoutingFunctionTests { .header("function.name", "fluxconsumer") .body(new String[] {"a", "b", "c"}), String.class); assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK); + } @Test + @DirtiesContext + @Ignore public void testFunctionPojo() throws Exception { ResponseEntity postForEntity = this.rest .exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME)) @@ -149,11 +162,13 @@ public class RoutingFunctionTests { } @Test + @DirtiesContext + @Ignore public void testConsumerMessage() throws Exception { ResponseEntity postForEntity = this.rest .exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME)) .contentType(MediaType.TEXT_PLAIN) - .header("function.name", "messageConsumer") + .header("spring.cloud.function.definition", "messageConsumer") .body("{\"name\":\"Bob\",\"age\":25}"), String.class); assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK); } @@ -237,8 +252,6 @@ public class RoutingFunctionTests { public void setValue(String value) { this.value = value; } - - } }