GH-828 Add support for configuring additional routers

Resolves #828
This commit is contained in:
Oleg Zhurakousky
2022-03-24 18:26:41 +01:00
parent 4eb4f741a5
commit 03db9baee6
3 changed files with 157 additions and 19 deletions

View File

@@ -145,7 +145,7 @@ public class RoutingFunction implements Function<Object, Object> {
The routing instructions could be communicated in several ways. We support providing instructions via Message headers, System
properties as well as pluggable strategy. So let's look at some of the details
*MessageRoutingCallback*
==== MessageRoutingCallback
The `MessageRoutingCallback` is a strategy to assist with determining the name of the route-to function definition.
@@ -231,7 +231,7 @@ conflict resolutions in the event multiple mechanisms are used at the same time,
3. Application Properties (Any function)
*Function Filtering*
==== Function Filtering
Filtering is the type of routing where there are only two paths - 'go' or 'discard'. In terms of functions it mean
you only want to invoke a certain function if some condition returns 'true', otherwise you want to discard input.
However, when it comes to discarding input there are many interpretation of what it could mean in the context of your application.
@@ -261,6 +261,58 @@ due to the nature of the reactive functions which are invoked only once to pass
is handled by the reactor, hence we can not access and/or rely on the routing instructions communicated via individual
values (e.g., Message).
==== Multiple Routers
By default the framework will always have a single routing function configured as described in previous sections. However, there are times when you may need more then one routing function.
In that case you can create your own instance of the `RoutingFunction` bean in addition to the existing one as long as you give it a name other than `functionRouter`.
You can pass `spring.cloud.function.routing-expression` or `spring.cloud.function.definition` to RoutinFunction as key/value pairs in the map.
Here is a simple example
----
@Configuration
protected static class MultipleRouterConfiguration {
@Bean
RoutingFunction mySpecialRouter(FunctionCatalog functionCatalog, BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) {
Map<String, String> propertiesMap = new HashMap<>();
propertiesMap.put(FunctionProperties.PREFIX + ".routing-expression", "'reverse'");
return new RoutingFunction(functionCatalog, propertiesMap, new BeanFactoryResolver(beanFactory), routingCallback);
}
@Bean
public Function<String, String> reverse() {
return v -> new StringBuilder(v).reverse().toString();
}
@Bean
public Function<String, String> uppercase() {
return String::toUpperCase;
}
}
----
and a test that demonstrates how it works
`
----
@Test
public void testMultipleRouters() {
System.setProperty(FunctionProperties.PREFIX + ".routing-expression", "'uppercase'");
FunctionCatalog functionCatalog = this.configureCatalog(MultipleRouterConfiguration.class);
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
assertThat(function).isNotNull();
Message<String> message = MessageBuilder.withPayload("hello").build();
assertThat(function.apply(message)).isEqualTo("HELLO");
function = functionCatalog.lookup("mySpecialRouter");
assertThat(function).isNotNull();
message = MessageBuilder.withPayload("hello").build();
assertThat(function.apply(message)).isEqualTo("olleh");
}
----
=== Input/Output Enrichment
There are often times when you need to modify or refine an incoming or outgoing Message and to keep your code clean of non-functional concerns. You dont want to do it inside of your business logic.

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.function.context.config;
import java.util.Map;
import java.util.function.Function;
import org.apache.commons.logging.Log;
@@ -34,12 +35,13 @@ import org.springframework.context.expression.MapAccessor;
import org.springframework.expression.BeanResolver;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.DataBindingPropertyAccessor;
import org.springframework.expression.spel.support.SimpleEvaluationContext;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* An implementation of Function which acts as a gateway/router by actually
* delegating incoming invocation to a function specified .. .
@@ -60,6 +62,9 @@ public class RoutingFunction implements Function<Object, Object> {
private final StandardEvaluationContext evalContext = new StandardEvaluationContext();
private final SimpleEvaluationContext headerEvalContext = SimpleEvaluationContext
.forPropertyAccessors(DataBindingPropertyAccessor.forReadOnlyAccess()).build();
private final SpelExpressionParser spelParser = new SpelExpressionParser();
private final FunctionCatalog functionCatalog;
@@ -72,6 +77,18 @@ public class RoutingFunction implements Function<Object, Object> {
this(functionCatalog, functionProperties, null, null);
}
public RoutingFunction(FunctionCatalog functionCatalog, Map<String, String> propertiesMap,
BeanResolver beanResolver, MessageRoutingCallback routingCallback) {
this(functionCatalog, extractIntoFunctionProperties(propertiesMap), beanResolver, routingCallback);
}
private static FunctionProperties extractIntoFunctionProperties(Map<String, String> propertiesMap) {
FunctionProperties functionProperties = new FunctionProperties();
functionProperties.setDefinition(propertiesMap.get(FunctionProperties.FUNCTION_DEFINITION));
functionProperties.setRoutingExpression(propertiesMap.get(FunctionProperties.PREFIX + ".routing-expression"));
return functionProperties;
}
public RoutingFunction(FunctionCatalog functionCatalog, FunctionProperties functionProperties,
BeanResolver beanResolver, MessageRoutingCallback routingCallback) {
this.functionCatalog = functionCatalog;
@@ -124,7 +141,7 @@ public class RoutingFunction implements Function<Object, Object> {
}
}
else if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.routing-expression"))) {
function = this.functionFromExpression((String) message.getHeaders().get("spring.cloud.function.routing-expression"), message);
function = this.functionFromExpression((String) message.getHeaders().get("spring.cloud.function.routing-expression"), message, true);
if (function.isInputTypePublisher()) {
this.assertOriginalInputIsNotPublisher(originalInputIsPublisher);
}
@@ -193,12 +210,16 @@ public class RoutingFunction implements Function<Object, Object> {
}
private FunctionInvocationWrapper functionFromExpression(String routingExpression, Object input) {
return functionFromExpression(routingExpression, input, false);
}
private FunctionInvocationWrapper functionFromExpression(String routingExpression, Object input, boolean isViaHeader) {
Expression expression = spelParser.parseExpression(routingExpression);
if (input instanceof Message) {
input = MessageUtils.toCaseInsensitiveHeadersStructure((Message<?>) input);
}
String functionName = expression.getValue(this.evalContext, input, String.class);
String functionName = isViaHeader ? expression.getValue(this.headerEvalContext, input, String.class) : expression.getValue(this.evalContext, input, String.class);
Assert.hasText(functionName, "Failed to resolve function name based on routing expression '" + functionProperties.getRoutingExpression() + "'");
FunctionInvocationWrapper function = functionCatalog.lookup(functionName);
Assert.notNull(function, "Failed to lookup function to route to based on the expression '"

View File

@@ -16,6 +16,8 @@
package org.springframework.cloud.function.context.config;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import org.junit.jupiter.api.AfterEach;
@@ -24,17 +26,22 @@ import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.MessageRoutingCallback;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.expression.BeanFactoryResolver;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;
/**
*
@@ -52,13 +59,17 @@ public class RoutingFunctionTests {
context.close();
}
private FunctionCatalog configureCatalog() {
context = new SpringApplicationBuilder(RoutingFunctionConfiguration.class).run(
private FunctionCatalog configureCatalog(Class<?> configurationClass) {
context = new SpringApplicationBuilder(configurationClass).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.cloud.function.routing.enabled=true");
return context.getBean(FunctionCatalog.class);
}
private FunctionCatalog configureCatalog() {
return configureCatalog(RoutingFunctionConfiguration.class);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testInvocationWithMessageAndHeader() {
@@ -91,10 +102,7 @@ public class RoutingFunctionTests {
.setHeader(FunctionProperties.PREFIX + ".definition", "echoFlux").build();
Flux resultFlux = (Flux) function.apply(Flux.just(message));
StepVerifier
.create(resultFlux)
.expectError()
.verify();
StepVerifier.create(resultFlux).expectError().verify();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -106,10 +114,27 @@ public class RoutingFunctionTests {
Message<String> message = MessageBuilder.withPayload("hello")
.setHeader(FunctionProperties.PREFIX + ".routing-expression", "'echoFlux'").build();
Flux resultFlux = (Flux) function.apply(Flux.just(message));
StepVerifier
.create(resultFlux)
.expectError()
.verify();
StepVerifier.create(resultFlux).expectError().verify();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void failWithHeaderProvidedExpressionAccessingRuntime() {
FunctionCatalog functionCatalog = this.configureCatalog();
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
assertThat(function).isNotNull();
Message<String> message = MessageBuilder.withPayload("hello")
.setHeader(FunctionProperties.PREFIX + ".routing-expression",
"T(java.lang.Runtime).getRuntime().exec(\"open -a calculator.app\")")
.build();
try {
function.apply(message);
fail();
}
catch (Exception e) {
assertThat(e.getMessage()).isEqualTo("EL1005E: Type cannot be found 'java.lang.Runtime'");
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -151,7 +176,8 @@ public class RoutingFunctionTests {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testInvocationWithRoutingBeanExpression() {
System.setProperty(FunctionProperties.PREFIX + ".routing-expression", "@reverse.apply(#root.getHeaders().get('func'))");
System.setProperty(FunctionProperties.PREFIX + ".routing-expression",
"@reverse.apply(#root.getHeaders().get('func'))");
FunctionCatalog functionCatalog = this.configureCatalog();
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
assertThat(function).isNotNull();
@@ -170,16 +196,17 @@ public class RoutingFunctionTests {
Assertions.fail();
}
catch (Exception e) {
//ignore
// ignore
}
// non existing function
try {
function.apply(MessageBuilder.withPayload("hello").setHeader(FunctionProperties.PREFIX + ".definition", "blah").build());
function.apply(MessageBuilder.withPayload("hello")
.setHeader(FunctionProperties.PREFIX + ".definition", "blah").build());
Assertions.fail();
}
catch (Exception e) {
//ignore
// ignore
}
}
@@ -197,6 +224,22 @@ public class RoutingFunctionTests {
assertThat(function.apply(message)).isEqualTo("OLLEH");
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testMultipleRouters() {
System.setProperty(FunctionProperties.PREFIX + ".routing-expression", "'uppercase'");
FunctionCatalog functionCatalog = this.configureCatalog(MultipleRouterConfiguration.class);
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
assertThat(function).isNotNull();
Message<String> message = MessageBuilder.withPayload("hello").build();
assertThat(function.apply(message)).isEqualTo("HELLO");
function = functionCatalog.lookup("mySpecialRouter");
assertThat(function).isNotNull();
message = MessageBuilder.withPayload("hello").build();
assertThat(function.apply(message)).isEqualTo("olleh");
}
@EnableAutoConfiguration
@Configuration
protected static class RoutingFunctionConfiguration {
@@ -216,4 +259,26 @@ public class RoutingFunctionTests {
return f -> f;
}
}
@EnableAutoConfiguration
@Configuration
protected static class MultipleRouterConfiguration {
@Bean
RoutingFunction mySpecialRouter(FunctionCatalog functionCatalog, BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) {
Map<String, String> propertiesMap = new HashMap<>();
propertiesMap.put(FunctionProperties.PREFIX + ".routing-expression", "'reverse'");
return new RoutingFunction(functionCatalog, propertiesMap, new BeanFactoryResolver(beanFactory), routingCallback);
}
@Bean
public Function<String, String> reverse() {
return v -> new StringBuilder(v).reverse().toString();
}
@Bean
public Function<String, String> uppercase() {
return String::toUpperCase;
}
}
}