From 3c87e00d8ecca093a6491fed84729de30594e609 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Thu, 17 Nov 2022 11:27:32 +0100 Subject: [PATCH] GH-958 Add support for default routing Resolves #958 --- .../main/asciidoc/spring-cloud-function.adoc | 21 ++++++++ .../context/DefaultMessageRoutingHandler.java | 51 +++++++++++++++++++ .../function/context/FunctionProperties.java | 17 ++++--- .../catalog/SimpleFunctionRegistry.java | 1 + ...ntextFunctionCatalogAutoConfiguration.java | 12 ++++- .../context/config/RoutingFunction.java | 46 +++++++++++------ .../context/config/RoutingFunctionTests.java | 47 +++++++++++++++++ 7 files changed, 172 insertions(+), 23 deletions(-) create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/DefaultMessageRoutingHandler.java diff --git a/docs/src/main/asciidoc/spring-cloud-function.adoc b/docs/src/main/asciidoc/spring-cloud-function.adoc index aa1e2ceec..a336498e9 100644 --- a/docs/src/main/asciidoc/spring-cloud-function.adoc +++ b/docs/src/main/asciidoc/spring-cloud-function.adoc @@ -251,6 +251,27 @@ conflict resolutions in the event multiple mechanisms are used at the same time, 2. Message Headers (If function is imperative and no `MessageRoutingCallback` provided) 3. Application Properties (Any function) +*Unroutable Messages* + +In the event route-to function is not available in catalog you will get an exception stating that. + +There are cases when such behavior is not desired and you may want to have some "catch-all" type function which can handle such messages. +To accomplish that, framework provides `org.springframework.cloud.function.context.DefaultMessageRoutingHandler` strategy. All you need to do is register it as a bean. +Its default implementation will simply log the fact that the message is un-routable, but will allow message flow to proceed without the exception, effectively dropping the un-routable message. +If you want something more sophisticated all you need to do is provide your own implementation of this strategy and register it as a bean. + +[source, java] +---- +@Bean +public DefaultMessageRoutingHandler defaultRoutingHandler() { + return new DefaultMessageRoutingHandler() { + @Override + public void accept(Message message) { + // do something really cool + } + }; +} +---- ==== Function Filtering Filtering is the type of routing where there are only two paths - 'go' or 'discard'. In terms of functions it mean diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/DefaultMessageRoutingHandler.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/DefaultMessageRoutingHandler.java new file mode 100644 index 000000000..87c1b7ec7 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/DefaultMessageRoutingHandler.java @@ -0,0 +1,51 @@ +/* + * Copyright 2016-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context; + +import java.util.function.Consumer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.cloud.function.context.config.RoutingFunction; +import org.springframework.messaging.Message; + +/** + * Strategy for implementing a handler for un-routable messages. + * Works in parallel with {@link RoutingFunction}. When registered as a bean, RoutingFunction will not throw + * an exception if it can not route message and instead such message will be routed to this function. + * Its default implementation simply logs the un-routable event. + * Users are encouraged to provide their own implementation of this class. + * + * @author Oleg Zhurakousky + * @since 3.2.9 + * + */ +public class DefaultMessageRoutingHandler implements Consumer> { + + Log logger = LogFactory.getLog(DefaultMessageRoutingHandler.class); + + @Override + public void accept(Message message) { + if (logger.isDebugEnabled()) { + logger.debug("Route-to function can not be located in FunctionCatalog. Dropping unroutable message: " + message + ""); + } + else { + logger.warn("Route-to function can not be located in FunctionCatalog. Droping message"); + } + } +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java index db6ba6feb..e9727d508 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java @@ -57,6 +57,11 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA */ public final static String EXPECT_CONTENT_TYPE_HEADER = "expected-content-type"; + /** + * SpEL expression to be used with RoutingFunction. + */ + public final static String ROUTING_EXPRESSION = PREFIX + ".routing-expression"; + /** * The name of function definition property. */ @@ -68,6 +73,12 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA */ private String definition; + /** + * SpEL expression which should result in function definition (e.g., function name or composition instruction). + * NOTE: SpEL evaluation context's root object is the input argument (e.g., Message). + */ + private String routingExpression; + /** * List of functions that are not eligible to be registered in Function Catalog. */ @@ -102,12 +113,6 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA return configuration; } - /** - * SpEL expression which should result in function definition (e.g., function name or composition instruction). - * NOTE: SpEL evaluation context's root object is the input argument (e.g., Message). - */ - private String routingExpression; - @SuppressWarnings({ "unchecked", "rawtypes" }) public void setConfiguration(Map configuration) { for (Entry entry : configuration.entrySet()) { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index 450e06101..8984704ae 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -254,6 +254,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect if (!names.contains(functionDefinition)) { List eligibleFunction = names.stream() .filter(name -> !RoutingFunction.FUNCTION_NAME.equals(name)) + .filter(name -> !RoutingFunction.DEFAULT_ROUTE_HANDLER.equals(name)) .collect(Collectors.toList()); if (eligibleFunction.size() == 1 && !eligibleFunction.get(0).equals(functionDefinition) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index f2c68b11b..934e6991e 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -35,8 +35,10 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.function.cloudevent.CloudEventsFunctionInvocationHelper; +import org.springframework.cloud.function.context.DefaultMessageRoutingHandler; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; +import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionRegistry; import org.springframework.cloud.function.context.MessageRoutingCallback; import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry; @@ -56,6 +58,7 @@ import org.springframework.context.annotation.ComponentScan.Filter; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.FilterType; import org.springframework.context.expression.BeanFactoryResolver; +import org.springframework.core.ResolvableType; import org.springframework.core.convert.converter.GenericConverter; import org.springframework.core.convert.support.ConfigurableConversionService; import org.springframework.core.convert.support.DefaultConversionService; @@ -137,9 +140,16 @@ public class ContextFunctionCatalogAutoConfiguration { return new BeanFactoryAwareFunctionRegistry(conversionService, messageConverter, jsonMapper, functionProperties, functionInvocationHelper); } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Bean(RoutingFunction.FUNCTION_NAME) public RoutingFunction functionRouter(FunctionCatalog functionCatalog, FunctionProperties functionProperties, - BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) { + BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback, + @Nullable DefaultMessageRoutingHandler defaultMessageRoutingHandler) { + if (defaultMessageRoutingHandler != null) { + FunctionRegistration functionRegistration = new FunctionRegistration(defaultMessageRoutingHandler, RoutingFunction.DEFAULT_ROUTE_HANDLER); + functionRegistration.type(ResolvableType.forClassWithGenerics(Consumer.class, ResolvableType.forClassWithGenerics(Message.class, Object.class)).getType()); + ((FunctionRegistry) functionCatalog).register(functionRegistration); + } return new RoutingFunction(functionCatalog, functionProperties, new BeanFactoryResolver(beanFactory), routingCallback); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java index 7d7a4a207..f26467716 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java @@ -58,6 +58,11 @@ public class RoutingFunction implements Function { */ public static final String FUNCTION_NAME = "functionRouter"; + /** + * The name of this function for routing of un-routable messages. + */ + public static final String DEFAULT_ROUTE_HANDLER = "defaultMessageRoutingHandler"; + private static Log logger = LogFactory.getLog(RoutingFunction.class); private final StandardEvaluationContext evalContext = new StandardEvaluationContext(); @@ -82,13 +87,6 @@ public class RoutingFunction implements Function { this(functionCatalog, extractIntoFunctionProperties(propertiesMap), beanResolver, routingCallback); } - private static FunctionProperties extractIntoFunctionProperties(Map propertiesMap) { - FunctionProperties functionProperties = new FunctionProperties(); - functionProperties.setDefinition(propertiesMap.get(FunctionProperties.FUNCTION_DEFINITION)); - functionProperties.setRoutingExpression(propertiesMap.get(FunctionProperties.PREFIX + ".routing-expression")); - return functionProperties; - } - public RoutingFunction(FunctionCatalog functionCatalog, FunctionProperties functionProperties, BeanResolver beanResolver, MessageRoutingCallback routingCallback) { this.functionCatalog = functionCatalog; @@ -98,6 +96,13 @@ public class RoutingFunction implements Function { evalContext.setBeanResolver(beanResolver); } + private static FunctionProperties extractIntoFunctionProperties(Map propertiesMap) { + FunctionProperties functionProperties = new FunctionProperties(); + functionProperties.setDefinition(propertiesMap.get(FunctionProperties.FUNCTION_DEFINITION)); + functionProperties.setRoutingExpression(propertiesMap.get(FunctionProperties.ROUTING_EXPRESSION)); + return functionProperties; + } + @Override public Object apply(Object input) { return this.route(input, input instanceof Publisher); @@ -134,14 +139,14 @@ public class RoutingFunction implements Function { } } if (function == null) { - if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.definition"))) { - function = functionFromDefinition((String) message.getHeaders().get("spring.cloud.function.definition")); + if (StringUtils.hasText((String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION))) { + function = functionFromDefinition((String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION)); if (function.isInputTypePublisher()) { this.assertOriginalInputIsNotPublisher(originalInputIsPublisher); } } - else if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.routing-expression"))) { - function = this.functionFromExpression((String) message.getHeaders().get("spring.cloud.function.routing-expression"), message, true); + else if (StringUtils.hasText((String) message.getHeaders().get(FunctionProperties.ROUTING_EXPRESSION))) { + function = this.functionFromExpression((String) message.getHeaders().get(FunctionProperties.ROUTING_EXPRESSION), message, true); if (function.isInputTypePublisher()) { this.assertOriginalInputIsNotPublisher(originalInputIsPublisher); } @@ -198,7 +203,7 @@ public class RoutingFunction implements Function { } private FunctionInvocationWrapper functionFromDefinition(String definition) { - FunctionInvocationWrapper function = functionCatalog.lookup(definition); + FunctionInvocationWrapper function = this.resolveFunction(definition); Assert.notNull(function, "Failed to lookup function to route based on the value of 'spring.cloud.function.definition' property '" + functionProperties.getDefinition() + "'"); if (logger.isInfoEnabled()) { @@ -217,14 +222,23 @@ public class RoutingFunction implements Function { input = MessageUtils.toCaseInsensitiveHeadersStructure((Message) input); } - String functionName = isViaHeader ? expression.getValue(this.headerEvalContext, input, String.class) : expression.getValue(this.evalContext, input, String.class); - Assert.hasText(functionName, "Failed to resolve function name based on routing expression '" + functionProperties.getRoutingExpression() + "'"); - FunctionInvocationWrapper function = functionCatalog.lookup(functionName); + String definition = isViaHeader ? expression.getValue(this.headerEvalContext, input, String.class) : expression.getValue(this.evalContext, input, String.class); + Assert.hasText(definition, "Failed to resolve function name based on routing expression '" + functionProperties.getRoutingExpression() + "'"); + FunctionInvocationWrapper function = this.resolveFunction(definition); Assert.notNull(function, "Failed to lookup function to route to based on the expression '" - + functionProperties.getRoutingExpression() + "' which resolved to '" + functionName + "' function name."); + + functionProperties.getRoutingExpression() + "' which resolved to '" + definition + "' function definition."); if (logger.isInfoEnabled()) { logger.info("Resolved function from provided [routing-expression] " + routingExpression); } return function; } + + private FunctionInvocationWrapper resolveFunction(String definition) { + FunctionInvocationWrapper function = functionCatalog.lookup(definition); + if (function == null) { + function = functionCatalog.lookup(RoutingFunction.DEFAULT_ROUTE_HANDLER); + } + return function; + } + } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/RoutingFunctionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/RoutingFunctionTests.java index c577b66f3..efe47e310 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/RoutingFunctionTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/RoutingFunctionTests.java @@ -29,6 +29,7 @@ import reactor.test.StepVerifier; import org.springframework.beans.factory.BeanFactory; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.function.context.DefaultMessageRoutingHandler; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.MessageRoutingCallback; @@ -70,6 +71,31 @@ public class RoutingFunctionTests { return configureCatalog(RoutingFunctionConfiguration.class); } + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testDefaultRouting() { + Message message = MessageBuilder.withPayload("hello") + .setHeader(FunctionProperties.PREFIX + ".definition", "blah").build(); + + FunctionCatalog functionCatalog = this.configureCatalog(EmptyConfiguration.class); + Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME); + assertThat(function).isNotNull(); + try { + function.apply(message); + fail(); + } + catch (Exception e) { + // Good + } + // + functionCatalog = this.configureCatalog(ConfigurationWithDefaultMessageRoutingHandler.class); + function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME); + assertThat(function).isNotNull(); + function.apply(message); + ConfigurationWithDefaultMessageRoutingHandler config = this.context.getBean(ConfigurationWithDefaultMessageRoutingHandler.class); + assertThat(config.defaultHandlerInvoked).isTrue(); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Test public void testInvocationWithMessageAndHeader() { @@ -281,4 +307,25 @@ public class RoutingFunctionTests { return String::toUpperCase; } } + + @EnableAutoConfiguration + @Configuration + protected static class EmptyConfiguration { + } + + @EnableAutoConfiguration + @Configuration + protected static class ConfigurationWithDefaultMessageRoutingHandler { + public boolean defaultHandlerInvoked; + @Bean + public DefaultMessageRoutingHandler defaultRoutingHandler() { + return new DefaultMessageRoutingHandler() { + @Override + public void accept(Message message) { + super.accept(message); + defaultHandlerInvoked = true; + } + }; + } + } }