From 9491c9ce04baa52d4019d02c26a7d5dab86a3fdf Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Thu, 17 Nov 2022 11:27:32 +0100 Subject: [PATCH] GH-958 Add support for default routing Resolves #958 --- .../main/asciidoc/spring-cloud-function.adoc | 21 ++++++++ .../context/DefaultMessageRoutingHandler.java | 51 +++++++++++++++++++ .../function/context/FunctionProperties.java | 17 ++++--- .../catalog/SimpleFunctionRegistry.java | 1 + ...ntextFunctionCatalogAutoConfiguration.java | 13 ++++- .../context/config/RoutingFunction.java | 46 +++++++++++------ .../context/config/RoutingFunctionTests.java | 47 +++++++++++++++++ 7 files changed, 173 insertions(+), 23 deletions(-) create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/DefaultMessageRoutingHandler.java diff --git a/docs/src/main/asciidoc/spring-cloud-function.adoc b/docs/src/main/asciidoc/spring-cloud-function.adoc index 4b87bcffd..3a130920b 100644 --- a/docs/src/main/asciidoc/spring-cloud-function.adoc +++ b/docs/src/main/asciidoc/spring-cloud-function.adoc @@ -240,6 +240,27 @@ conflict resolutions in the event multiple mechanisms are used at the same time, 2. Message Headers (If function is imperative and no `MessageRoutingCallback` provided) 3. Application Properties (Any function) +*Unroutable Messages* + +In the event route-to function is not available in catalog you will get an exception stating that. + +There are cases when such behavior is not desired and you may want to have some "catch-all" type function which can handle such messages. +To accomplish that, framework provides `org.springframework.cloud.function.context.DefaultMessageRoutingHandler` strategy. All you need to do is register it as a bean. +Its default implementation will simply log the fact that the message is un-routable, but will allow message flow to proceed without the exception, effectively dropping the un-routable message. +If you want something more sophisticated all you need to do is provide your own implementation of this strategy and register it as a bean. + +[source, java] +---- +@Bean +public DefaultMessageRoutingHandler defaultRoutingHandler() { + return new DefaultMessageRoutingHandler() { + @Override + public void accept(Message message) { + // do something really cool + } + }; +} +---- ==== Function Filtering Filtering is the type of routing where there are only two paths - 'go' or 'discard'. In terms of functions it mean diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/DefaultMessageRoutingHandler.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/DefaultMessageRoutingHandler.java new file mode 100644 index 000000000..87c1b7ec7 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/DefaultMessageRoutingHandler.java @@ -0,0 +1,51 @@ +/* + * Copyright 2016-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context; + +import java.util.function.Consumer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.cloud.function.context.config.RoutingFunction; +import org.springframework.messaging.Message; + +/** + * Strategy for implementing a handler for un-routable messages. + * Works in parallel with {@link RoutingFunction}. When registered as a bean, RoutingFunction will not throw + * an exception if it can not route message and instead such message will be routed to this function. + * Its default implementation simply logs the un-routable event. + * Users are encouraged to provide their own implementation of this class. + * + * @author Oleg Zhurakousky + * @since 3.2.9 + * + */ +public class DefaultMessageRoutingHandler implements Consumer> { + + Log logger = LogFactory.getLog(DefaultMessageRoutingHandler.class); + + @Override + public void accept(Message message) { + if (logger.isDebugEnabled()) { + logger.debug("Route-to function can not be located in FunctionCatalog. Dropping unroutable message: " + message + ""); + } + else { + logger.warn("Route-to function can not be located in FunctionCatalog. Droping message"); + } + } +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java index 4a7a3f1e9..efad20fb4 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java @@ -50,6 +50,11 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA */ public final static String EXPECT_CONTENT_TYPE_HEADER = "expected-content-type"; + /** + * SpEL expression to be used with RoutingFunction. + */ + public final static String ROUTING_EXPRESSION = PREFIX + ".routing-expression"; + /** * The name of function definition property. */ @@ -61,6 +66,12 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA */ private String definition; + /** + * SpEL expression which should result in function definition (e.g., function name or composition instruction). + * NOTE: SpEL evaluation context's root object is the input argument (e.g., Message). + */ + private String routingExpression; + /** * List of functions that are not eligible to be registered in Function Catalog. */ @@ -95,12 +106,6 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA return configuration; } - /** - * SpEL expression which should result in function definition (e.g., function name or composition instruction). - * NOTE: SpEL evaluation context's root object is the input argument (e.g., Message). - */ - private String routingExpression; - @SuppressWarnings({ "unchecked", "rawtypes" }) public void setConfiguration(Map configuration) { for (Entry entry : configuration.entrySet()) { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index afa7a7a8d..493b26628 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -243,6 +243,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry { if (!names.contains(functionDefinition)) { List eligibleFunction = names.stream() .filter(name -> !RoutingFunction.FUNCTION_NAME.equals(name)) + .filter(name -> !RoutingFunction.DEFAULT_ROUTE_HANDLER.equals(name)) .collect(Collectors.toList()); if (eligibleFunction.size() == 1 && !eligibleFunction.get(0).equals(functionDefinition) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index f709e02db..55299fb99 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -35,11 +35,14 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.function.cloudevent.CloudEventsFunctionInvocationHelper; +import org.springframework.cloud.function.context.DefaultMessageRoutingHandler; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; +import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionRegistry; import org.springframework.cloud.function.context.MessageRoutingCallback; import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry; +import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; import org.springframework.cloud.function.core.FunctionInvocationHelper; import org.springframework.cloud.function.json.GsonMapper; import org.springframework.cloud.function.json.JacksonMapper; @@ -53,6 +56,7 @@ import org.springframework.context.annotation.ComponentScan.Filter; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.FilterType; import org.springframework.context.expression.BeanFactoryResolver; +import org.springframework.core.ResolvableType; import org.springframework.core.convert.converter.GenericConverter; import org.springframework.core.convert.support.ConfigurableConversionService; import org.springframework.core.convert.support.DefaultConversionService; @@ -131,9 +135,16 @@ public class ContextFunctionCatalogAutoConfiguration { return new BeanFactoryAwareFunctionRegistry(conversionService, messageConverter, jsonMapper, functionProperties, functionInvocationHelper); } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Bean(RoutingFunction.FUNCTION_NAME) public RoutingFunction functionRouter(FunctionCatalog functionCatalog, FunctionProperties functionProperties, - BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) { + BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback, + @Nullable DefaultMessageRoutingHandler defaultMessageRoutingHandler) { + if (defaultMessageRoutingHandler != null) { + FunctionRegistration functionRegistration = new FunctionRegistration(defaultMessageRoutingHandler, RoutingFunction.DEFAULT_ROUTE_HANDLER); + functionRegistration.type(FunctionTypeUtils.consumerType(ResolvableType.forClassWithGenerics(Message.class, Object.class).getType())); + ((FunctionRegistry) functionCatalog).register(functionRegistration); + } return new RoutingFunction(functionCatalog, functionProperties, new BeanFactoryResolver(beanFactory), routingCallback); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java index 6b90d5ef8..1d3f9d9ed 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java @@ -57,6 +57,11 @@ public class RoutingFunction implements Function { */ public static final String FUNCTION_NAME = "functionRouter"; + /** + * The name of this function for routing of un-routable messages. + */ + public static final String DEFAULT_ROUTE_HANDLER = "defaultMessageRoutingHandler"; + private static Log logger = LogFactory.getLog(RoutingFunction.class); private final StandardEvaluationContext evalContext = new StandardEvaluationContext(); @@ -81,13 +86,6 @@ public class RoutingFunction implements Function { this(functionCatalog, extractIntoFunctionProperties(propertiesMap), beanResolver, routingCallback); } - private static FunctionProperties extractIntoFunctionProperties(Map propertiesMap) { - FunctionProperties functionProperties = new FunctionProperties(); - functionProperties.setDefinition(propertiesMap.get(FunctionProperties.FUNCTION_DEFINITION)); - functionProperties.setRoutingExpression(propertiesMap.get(FunctionProperties.PREFIX + ".routing-expression")); - return functionProperties; - } - public RoutingFunction(FunctionCatalog functionCatalog, FunctionProperties functionProperties, BeanResolver beanResolver, MessageRoutingCallback routingCallback) { this.functionCatalog = functionCatalog; @@ -97,6 +95,13 @@ public class RoutingFunction implements Function { evalContext.setBeanResolver(beanResolver); } + private static FunctionProperties extractIntoFunctionProperties(Map propertiesMap) { + FunctionProperties functionProperties = new FunctionProperties(); + functionProperties.setDefinition(propertiesMap.get(FunctionProperties.FUNCTION_DEFINITION)); + functionProperties.setRoutingExpression(propertiesMap.get(FunctionProperties.ROUTING_EXPRESSION)); + return functionProperties; + } + @Override public Object apply(Object input) { return this.route(input, input instanceof Publisher); @@ -128,14 +133,14 @@ public class RoutingFunction implements Function { } } if (function == null) { - if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.definition"))) { - function = functionFromDefinition((String) message.getHeaders().get("spring.cloud.function.definition")); + if (StringUtils.hasText((String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION))) { + function = functionFromDefinition((String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION)); if (function.isInputTypePublisher()) { this.assertOriginalInputIsNotPublisher(originalInputIsPublisher); } } - else if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.routing-expression"))) { - function = this.functionFromExpression((String) message.getHeaders().get("spring.cloud.function.routing-expression"), message, true); + else if (StringUtils.hasText((String) message.getHeaders().get(FunctionProperties.ROUTING_EXPRESSION))) { + function = this.functionFromExpression((String) message.getHeaders().get(FunctionProperties.ROUTING_EXPRESSION), message, true); if (function.isInputTypePublisher()) { this.assertOriginalInputIsNotPublisher(originalInputIsPublisher); } @@ -192,7 +197,7 @@ public class RoutingFunction implements Function { } private FunctionInvocationWrapper functionFromDefinition(String definition) { - FunctionInvocationWrapper function = functionCatalog.lookup(definition); + FunctionInvocationWrapper function = this.resolveFunction(definition); Assert.notNull(function, "Failed to lookup function to route based on the value of 'spring.cloud.function.definition' property '" + functionProperties.getDefinition() + "'"); if (logger.isInfoEnabled()) { @@ -211,14 +216,23 @@ public class RoutingFunction implements Function { input = MessageUtils.toCaseInsensitiveHeadersStructure((Message) input); } - String functionName = isViaHeader ? expression.getValue(this.headerEvalContext, input, String.class) : expression.getValue(this.evalContext, input, String.class); - Assert.hasText(functionName, "Failed to resolve function name based on routing expression '" + functionProperties.getRoutingExpression() + "'"); - FunctionInvocationWrapper function = functionCatalog.lookup(functionName); + String definition = isViaHeader ? expression.getValue(this.headerEvalContext, input, String.class) : expression.getValue(this.evalContext, input, String.class); + Assert.hasText(definition, "Failed to resolve function name based on routing expression '" + functionProperties.getRoutingExpression() + "'"); + FunctionInvocationWrapper function = this.resolveFunction(definition); Assert.notNull(function, "Failed to lookup function to route to based on the expression '" - + functionProperties.getRoutingExpression() + "' which resolved to '" + functionName + "' function name."); + + functionProperties.getRoutingExpression() + "' which resolved to '" + definition + "' function definition."); if (logger.isInfoEnabled()) { logger.info("Resolved function from provided [routing-expression] " + routingExpression); } return function; } + + private FunctionInvocationWrapper resolveFunction(String definition) { + FunctionInvocationWrapper function = functionCatalog.lookup(definition); + if (function == null) { + function = functionCatalog.lookup(RoutingFunction.DEFAULT_ROUTE_HANDLER); + } + return function; + } + } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/RoutingFunctionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/RoutingFunctionTests.java index a961472eb..591d60b09 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/RoutingFunctionTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/RoutingFunctionTests.java @@ -29,6 +29,7 @@ import reactor.test.StepVerifier; import org.springframework.beans.factory.BeanFactory; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.function.context.DefaultMessageRoutingHandler; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.MessageRoutingCallback; @@ -70,6 +71,31 @@ public class RoutingFunctionTests { return configureCatalog(RoutingFunctionConfiguration.class); } + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testDefaultRouting() { + Message message = MessageBuilder.withPayload("hello") + .setHeader(FunctionProperties.PREFIX + ".definition", "blah").build(); + + FunctionCatalog functionCatalog = this.configureCatalog(EmptyConfiguration.class); + Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME); + assertThat(function).isNotNull(); + try { + function.apply(message); + fail(); + } + catch (Exception e) { + // Good + } + // + functionCatalog = this.configureCatalog(ConfigurationWithDefaultMessageRoutingHandler.class); + function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME); + assertThat(function).isNotNull(); + function.apply(message); + ConfigurationWithDefaultMessageRoutingHandler config = this.context.getBean(ConfigurationWithDefaultMessageRoutingHandler.class); + assertThat(config.defaultHandlerInvoked).isTrue(); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Test public void testInvocationWithMessageAndHeader() { @@ -281,4 +307,25 @@ public class RoutingFunctionTests { return String::toUpperCase; } } + + @EnableAutoConfiguration + @Configuration + protected static class EmptyConfiguration { + } + + @EnableAutoConfiguration + @Configuration + protected static class ConfigurationWithDefaultMessageRoutingHandler { + public boolean defaultHandlerInvoked; + @Bean + public DefaultMessageRoutingHandler defaultRoutingHandler() { + return new DefaultMessageRoutingHandler() { + @Override + public void accept(Message message) { + super.accept(message); + defaultHandlerInvoked = true; + } + }; + } + } }