GH-408 Enhance RoutingFunction with SpEL and application properties

- Added initial support for communicating routing instructions via SpEL thru both message headers and application properties
- Added support for communication function definition via application properties
- Added additional tests and updated documentation

Resolves #408
This commit is contained in:
Oleg Zhurakousky
2019-09-04 18:30:12 +02:00
parent 11ac6cd679
commit 2aed5abff8
11 changed files with 487 additions and 133 deletions

View File

@@ -0,0 +1,62 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
*
* @author Oleg Zhurakousky
* @since 3.0
*
*/
@ConfigurationProperties(prefix = FunctionProperties.PREFIX)
public class FunctionProperties {
/**
* The name prefix for properties defined by this properties class.
*/
public final static String PREFIX = "spring.cloud.function";
/**
* Definition of the function to be used. This could be function name (e.g., 'myFunction')
* or function composition definition (e.g., 'myFunction|yourFunction')
*/
private String definition;
/**
* SpEL expression which should result in function definition (e.g., function name or composition instruction).
* NOTE: SpEL evaluation context's root object is the input argument (e.g., Message).
*/
private String routingExpression;
public String getDefinition() {
return definition;
}
public void setDefinition(String definition) {
this.definition = definition;
}
public String getRoutingExpression() {
return routingExpression;
}
public void setRoutingExpression(String routingExpression) {
this.routingExpression = routingExpression;
}
}

View File

@@ -54,6 +54,7 @@ import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.cloud.function.context.config.FunctionContextUtils;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
@@ -213,9 +214,13 @@ public class BeanFactoryAwareFunctionRegistry
private String discoverDefaultDefinitionIfNecessary(String definition) {
if (StringUtils.isEmpty(definition)) {
String[] functionNames = Stream.of(this.applicationContext.getBeanNamesForType(Function.class)).filter(n -> !n.startsWith("_")).toArray(String[]::new);
String[] consumerNames = Stream.of(this.applicationContext.getBeanNamesForType(Consumer.class)).filter(n -> !n.startsWith("_")).toArray(String[]::new);
String[] supplierNames = Stream.of(this.applicationContext.getBeanNamesForType(Supplier.class)).filter(n -> !n.startsWith("_")).toArray(String[]::new);
// the underscores are for Kotlin function registrations (see KotlinLambdaToFunctionAutoConfiguration)
String[] functionNames = Stream.of(this.applicationContext.getBeanNamesForType(Function.class))
.filter(n -> !n.startsWith("_") && !n.equals(RoutingFunction.FUNCTION_NAME)).toArray(String[]::new);
String[] consumerNames = Stream.of(this.applicationContext.getBeanNamesForType(Consumer.class))
.filter(n -> !n.startsWith("_") && !n.equals(RoutingFunction.FUNCTION_NAME)).toArray(String[]::new);
String[] supplierNames = Stream.of(this.applicationContext.getBeanNamesForType(Supplier.class))
.filter(n -> !n.startsWith("_") && !n.equals(RoutingFunction.FUNCTION_NAME)).toArray(String[]::new);
/*
* we may need to add BiFunction and BiConsumer at some point
*/
@@ -454,15 +459,11 @@ public class BeanFactoryAwareFunctionRegistry
}
public boolean isConsumer() {
return this.target instanceof Consumer;
}
public boolean isFunction() {
return this.target instanceof Function;
return FunctionTypeUtils.isConsumer(this.functionType);
}
public boolean isSupplier() {
return this.target instanceof Supplier;
return FunctionTypeUtils.isSupplier(this.functionType);
}
public Object getTarget() {
@@ -528,6 +529,11 @@ public class BeanFactoryAwareFunctionRegistry
Publisher<?> publisher = FunctionTypeUtils.isFlux(type)
? input == null ? Flux.empty() : Flux.just(input)
: input == null ? Mono.empty() : Mono.just(input);
if (logger.isDebugEnabled()) {
logger.debug("Invoking reactive function '" + this.functionType + "' with non-reactive input "
+ "should at least assume reactive output (e.g., Function<String, Flux<String>> f3 = catalog.lookup(\"echoFlux\");), "
+ "otherwise invocation will result in ClassCastException.");
}
result = this.invokeFunction(this.convertInputPublisherIfNecessary(publisher, FunctionTypeUtils.getInputType(this.functionType, 0)));
}
else {

View File

@@ -20,10 +20,12 @@ import java.util.Collections;
import java.util.Set;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.context.config.RoutingFunction;
/**
* @author Dave Syer
*
* @author Oleg Zhurakousky
*/
public interface FunctionInspector {
@@ -31,6 +33,11 @@ public interface FunctionInspector {
default boolean isMessage(Object function) {
FunctionRegistration<?> registration = getRegistration(function);
if (registration != null && registration.getTarget() instanceof FunctionInvocationWrapper
&& ((FunctionInvocationWrapper) registration.getTarget()).getTarget() instanceof RoutingFunction) {
// we always want to give routing function as much information as possible
return true;
}
return registration == null ? false : registration.getType().isMessage();
}

View File

@@ -17,7 +17,6 @@
package org.springframework.cloud.function.context.config;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
@@ -32,7 +31,9 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
@@ -63,6 +64,7 @@ import org.springframework.util.CollectionUtils;
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(FunctionCatalog.class)
@EnableConfigurationProperties(FunctionProperties.class)
public class ContextFunctionCatalogAutoConfiguration {
static final String PREFERRED_MAPPER_PROPERTY = "spring.http.converters.preferred-json-mapper";
@@ -95,14 +97,8 @@ public class ContextFunctionCatalogAutoConfiguration {
}
@Bean(RoutingFunction.FUNCTION_NAME)
@ConditionalOnProperty(name = "spring.cloud.function.routing.enabled", havingValue = "true")
RoutingFunction gateway(FunctionCatalog functionCatalog, FunctionInspector functionInspector) {
Collection<MessageConverter> messageConverters = new ArrayList<MessageConverter>();
messageConverters.add(new MappingJackson2MessageConverter());
messageConverters.add(new StringMessageConverter());
messageConverters.add(new ByteArrayMessageConverter());
CompositeMessageConverter messageConverter = new CompositeMessageConverter(messageConverters);
return new RoutingFunction(functionCatalog, functionInspector, messageConverter);
RoutingFunction functionRouter(FunctionCatalog functionCatalog, FunctionInspector functionInspector, FunctionProperties functionProperties) {
return new RoutingFunction(functionCatalog, functionInspector, functionProperties);
}
@Configuration(proxyBeanMethods = false)

View File

@@ -16,98 +16,168 @@
package org.springframework.cloud.function.context.config;
import java.lang.reflect.Type;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Mono;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.context.expression.MapAccessor;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* An implementation of Function which acts as a gateway/router by actually
* delegating incoming invocation to a function specified via `function.name`
* message header. <br>
* {@link Message} is used as a canonical representation of a request which
* contains some metadata and it is the responsibility of the higher level
* framework to convert the incoming request into a Message. For example;
* spring-cloud-function-web will create Message from HttpRequest copying all
* HTTP headers as message headers.
* delegating incoming invocation to a function specified .. .
*
* @author Oleg Zhurakousky
* @since 2.1
*
*/
public class RoutingFunction implements Function<Publisher<Message<?>>, Publisher<?>> {
public class RoutingFunction implements Function<Object, Object> {
/**
* The name of this function use by BeanFactory.
*/
public static final String FUNCTION_NAME = "router";
public static final String FUNCTION_NAME = "functionRouter";
private static Log logger = LogFactory.getLog(RoutingFunction.class);
private final StandardEvaluationContext evalContext = new StandardEvaluationContext();
private final SpelExpressionParser spelParser = new SpelExpressionParser();
private final FunctionCatalog functionCatalog;
private final FunctionProperties functionProperties;
private final FunctionInspector functionInspector;
private final MessageConverter messageConverter;
RoutingFunction(FunctionCatalog functionCatalog, FunctionInspector functionInspector,
MessageConverter messageConverter) {
public RoutingFunction(FunctionCatalog functionCatalog, FunctionInspector functionInspector, FunctionProperties functionProperties) {
this.functionCatalog = functionCatalog;
this.functionProperties = functionProperties;
this.functionInspector = functionInspector;
this.messageConverter = messageConverter;
this.evalContext.addPropertyAccessor(new MapAccessor());
}
@SuppressWarnings("unchecked")
@Override
public Publisher<?> apply(Publisher<Message<?>> input) {
return Flux.from(input)
.switchOnFirst((signal, flux) -> {
Assert.isTrue(signal.hasValue()
&& signal.getType() == SignalType.ON_NEXT, "Signal has no value or wrong type " + signal);
Function<Flux<Object>, Publisher<Object>> function = this.getRouteToFunction(signal.get());
return flux.map(message -> {
Object inputValue = this.convertInput(message, function);
return inputValue;
})
.log()
.doOnError(error -> {
throw new IllegalStateException("Failed to convert Message. Possible reason; "
+ "No suitable converter was found for payload with 'contentType' "
+ signal.get().getHeaders().get(MessageHeaders.CONTENT_TYPE), error);
})
.transform(function);
});
public Object apply(Object input) {
return this.route(input, input instanceof Publisher);
}
/*
* - Check if function-name is set in header and if it is use it.
* If NOT
* - Check routing-expression and if it is set use it
* If NOT
* - Check function-name is set in FunctionProperties and if it is use it
* If NOT
* - Fail
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
private Object route(Object input, boolean originalInputIsPublisher) {
Function function;
if (input instanceof Message) {
Message<?> message = (Message<?>) input;
if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.definition"))) {
function = functionFromDefinition((String) message.getHeaders().get("spring.cloud.function.definition"));
Type functionType = functionInspector.getRegistration(function).getType().getType();
if (FunctionTypeUtils.isReactive(FunctionTypeUtils.getInputType(functionType, 0))) {
this.assertOriginalInputIsNotPublisher(originalInputIsPublisher);
}
}
else if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.routing-expression"))) {
function = this.functionFromExpression((String) message.getHeaders().get("spring.cloud.function.routing-expression"), message);
Type functionType = functionInspector.getRegistration(function).getType().getType();
if (FunctionTypeUtils.isReactive(FunctionTypeUtils.getInputType(functionType, 0))) {
this.assertOriginalInputIsNotPublisher(originalInputIsPublisher);
}
}
else if (StringUtils.hasText(functionProperties.getRoutingExpression())) {
function = this.functionFromExpression(functionProperties.getRoutingExpression(), message);
}
else if (StringUtils.hasText(functionProperties.getDefinition())) {
function = functionFromDefinition(functionProperties.getDefinition());
}
else {
throw new IllegalStateException("Failed to establish route, since neither were provided: "
+ "'spring.cloud.function.definition' as Message header or as application property or "
+ "'spring.cloud.function.routing-expression' as application property.");
}
}
else if (input instanceof Publisher) {
if (StringUtils.hasText(functionProperties.getRoutingExpression())) {
function = this.functionFromExpression(functionProperties.getRoutingExpression(), input);
}
else
if (StringUtils.hasText(functionProperties.getDefinition())) {
function = functionFromDefinition(functionProperties.getDefinition());
}
else {
return input instanceof Mono
? Mono.from((Publisher<?>) input).map(v -> route(v, originalInputIsPublisher))
: Flux.from((Publisher<?>) input).map(v -> route(v, originalInputIsPublisher));
}
}
else {
this.assertOriginalInputIsNotPublisher(originalInputIsPublisher);
if (StringUtils.hasText(functionProperties.getRoutingExpression())) {
function = this.functionFromExpression(functionProperties.getRoutingExpression(), input);
}
else
if (StringUtils.hasText(functionProperties.getDefinition())) {
function = functionFromDefinition(functionProperties.getDefinition());
}
else {
throw new IllegalStateException("Failed to establish route, since neither were provided: "
+ "'spring.cloud.function.definition' as Message header or as application property or "
+ "'spring.cloud.function.routing-expression' as application property.");
}
}
return function.apply(input);
}
private void assertOriginalInputIsNotPublisher(boolean originalInputIsPublisher) {
Assert.isTrue(!originalInputIsPublisher, "Routing input of type Publisher is not supported per individual "
+ "values (e.g., message header or POJO). Instead you should use 'spring.cloud.function.definition' or "
+ "spring.cloud.function.routing-expression' as application properties.");
}
@SuppressWarnings("rawtypes")
private Function getRouteToFunction(Message<?> message) {
String routeToFunctionName = (String) message.getHeaders().get("function.name");
Assert.hasText(routeToFunctionName, "A 'function.name' was not provided as message header.");
Function function = functionCatalog.lookup(routeToFunctionName);
Assert.notNull(function, "Failed to locate function specified with 'function.name':"
+ message.getHeaders().get("function.name"));
private Function functionFromDefinition(String definition) {
Function function = functionCatalog.lookup(definition);
Assert.notNull(function, "Failed to lookup function to route based on the value of 'spring.cloud.function.definition' property '"
+ functionProperties.getDefinition() + "'");
if (logger.isInfoEnabled()) {
logger.info("Resolved function from provided [definition] property " + functionProperties.getDefinition());
}
return function;
}
private Object convertInput(Message<?> message, Object function) {
Class<?> inputType = functionInspector.getInputType(function);
Object inputValue = message.getPayload();
if (!inputValue.getClass().isAssignableFrom(inputType)) {
inputValue = this.messageConverter.fromMessage(message, functionInspector.getInputType(function));
@SuppressWarnings("rawtypes")
private Function functionFromExpression(String routingExpression, Object input) {
Expression expression = spelParser.parseExpression(routingExpression);
String functionName = expression.getValue(this.evalContext, input, String.class);
Assert.hasText(functionName, "Failed to resolve function name based on routing expression '" + functionProperties.getRoutingExpression() + "'");
Function function = functionCatalog.lookup(functionName);
Assert.notNull(function, "Failed to lookup function to route to based on the expression '"
+ functionProperties.getRoutingExpression() + "' whcih resolved to '" + functionName + "' function name.");
if (logger.isInfoEnabled()) {
logger.info("Resolved function from provided [routing-expression] " + routingExpression);
}
if (this.functionInspector.isMessage(function)) {
inputValue = MessageBuilder.createMessage(inputValue, message.getHeaders());
}
Assert.notNull(inputValue, "Failed to determine input value of type "
+ inputType + " from Message '"
+ message + "'. No suitable Message Converter found.");
return inputValue;
return function;
}
}

View File

@@ -0,0 +1,187 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context.config;
import java.util.function.Function;
import org.junit.After;
import org.junit.Test;
import reactor.core.publisher.Flux;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;
/**
*
* @author Oleg Zhurakousky
*
*/
public class RoutingFunctionTests {
private ConfigurableApplicationContext context;
@After
public void before() {
System.clearProperty("spring.cloud.function.definition");
System.clearProperty("spring.cloud.function.routing-expression");
context.close();
}
private FunctionCatalog configureCatalog() {
context = new SpringApplicationBuilder(RoutingFunctionConfiguration.class).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.cloud.function.routing.enabled=true");
FunctionCatalog catalog = context.getBean(FunctionCatalog.class);
return catalog;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testInvocationWithMessageAndHeader() {
FunctionCatalog functionCatalog = this.configureCatalog();
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
assertThat(function).isNotNull();
Message<String> message = MessageBuilder.withPayload("hello")
.setHeader(FunctionProperties.PREFIX + ".definition", "reverse").build();
assertThat(function.apply(message)).isEqualTo("olleh");
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testRoutingSimpleInputWithReactiveFunctionWithMessageHeader() {
FunctionCatalog functionCatalog = this.configureCatalog();
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
assertThat(function).isNotNull();
Message<String> message = MessageBuilder.withPayload("hello")
.setHeader(FunctionProperties.PREFIX + ".definition", "echoFlux").build();
assertThat(((Flux) function.apply(message)).blockFirst()).isEqualTo("hello");
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test(expected = Exception.class)
public void testRoutingReactiveInputWithReactiveFunctionAndDefinitionMessageHeader() {
FunctionCatalog functionCatalog = this.configureCatalog();
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
assertThat(function).isNotNull();
Message<String> message = MessageBuilder.withPayload("hello")
.setHeader(FunctionProperties.PREFIX + ".definition", "echoFlux").build();
Flux resultFlux = (Flux) function.apply(Flux.just(message));
resultFlux.subscribe();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test(expected = Exception.class)
public void testRoutingReactiveInputWithReactiveFunctionAndExpressionMessageHeader() {
FunctionCatalog functionCatalog = this.configureCatalog();
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
assertThat(function).isNotNull();
Message<String> message = MessageBuilder.withPayload("hello")
.setHeader(FunctionProperties.PREFIX + ".routing-expression", "'echoFlux'").build();
Flux resultFlux = (Flux) function.apply(Flux.just(message));
resultFlux.subscribe();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testInvocationWithMessageAndDefinitionProperty() {
System.setProperty(FunctionProperties.PREFIX + ".definition", "reverse");
FunctionCatalog functionCatalog = this.configureCatalog();
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
assertThat(function).isNotNull();
Message<String> message = MessageBuilder.withPayload("hello").build();
assertThat(function.apply(message)).isEqualTo("olleh");
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testInvocationWithMessageAndRoutingExpression() {
System.setProperty(FunctionProperties.PREFIX + ".routing-expression", "headers.function_name");
FunctionCatalog functionCatalog = this.configureCatalog();
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
assertThat(function).isNotNull();
Message<String> message = MessageBuilder.withPayload("hello").setHeader("function_name", "reverse").build();
assertThat(function.apply(message)).isEqualTo("olleh");
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testOtherExpectedFailures() {
FunctionCatalog functionCatalog = this.configureCatalog();
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
// no function.definition header or function property
try {
function.apply(MessageBuilder.withPayload("hello").build());
fail();
}
catch (Exception e) {
//ignore
}
// non existing function
try {
function.apply(MessageBuilder.withPayload("hello").setHeader(FunctionProperties.PREFIX + ".definition", "blah").build());
fail();
}
catch (Exception e) {
//ignore
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testInvocationWithMessageComposed() {
FunctionCatalog functionCatalog = this.configureCatalog();
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME + "|uppercase");
assertThat(function).isNotNull();
Message<String> message = MessageBuilder.withPayload("hello")
.setHeader(FunctionProperties.PREFIX + ".definition", "uppercase").build();
assertThat(function.apply(message)).isEqualTo("HELLO");
}
@EnableAutoConfiguration
@Configuration
protected static class RoutingFunctionConfiguration {
@Bean
public Function<String, String> reverse() {
return v -> new StringBuilder(v).reverse().toString();
}
@Bean
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}
@Bean
public Function<Flux<String>, Flux<String>> echoFlux() {
return f -> f;
}
}
}