@@ -0,0 +1,51 @@
|
||||
/*
|
||||
* Copyright 2016-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.function.context;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.springframework.cloud.function.context.config.RoutingFunction;
|
||||
import org.springframework.messaging.Message;
|
||||
|
||||
/**
|
||||
* Strategy for implementing a handler for un-routable messages.
|
||||
* Works in parallel with {@link RoutingFunction}. When registered as a bean, RoutingFunction will not throw
|
||||
* an exception if it can not route message and instead such message will be routed to this function.
|
||||
* Its default implementation simply logs the un-routable event.
|
||||
* Users are encouraged to provide their own implementation of this class.
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
* @since 3.2.9
|
||||
*
|
||||
*/
|
||||
public class DefaultMessageRoutingHandler implements Consumer<Message<?>> {
|
||||
|
||||
Log logger = LogFactory.getLog(DefaultMessageRoutingHandler.class);
|
||||
|
||||
@Override
|
||||
public void accept(Message<?> message) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Route-to function can not be located in FunctionCatalog. Dropping unroutable message: " + message + "");
|
||||
}
|
||||
else {
|
||||
logger.warn("Route-to function can not be located in FunctionCatalog. Droping message");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -57,6 +57,11 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA
|
||||
*/
|
||||
public final static String EXPECT_CONTENT_TYPE_HEADER = "expected-content-type";
|
||||
|
||||
/**
|
||||
* SpEL expression to be used with RoutingFunction.
|
||||
*/
|
||||
public final static String ROUTING_EXPRESSION = PREFIX + ".routing-expression";
|
||||
|
||||
/**
|
||||
* The name of function definition property.
|
||||
*/
|
||||
@@ -68,6 +73,12 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA
|
||||
*/
|
||||
private String definition;
|
||||
|
||||
/**
|
||||
* SpEL expression which should result in function definition (e.g., function name or composition instruction).
|
||||
* NOTE: SpEL evaluation context's root object is the input argument (e.g., Message).
|
||||
*/
|
||||
private String routingExpression;
|
||||
|
||||
/**
|
||||
* List of functions that are not eligible to be registered in Function Catalog.
|
||||
*/
|
||||
@@ -102,12 +113,6 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA
|
||||
return configuration;
|
||||
}
|
||||
|
||||
/**
|
||||
* SpEL expression which should result in function definition (e.g., function name or composition instruction).
|
||||
* NOTE: SpEL evaluation context's root object is the input argument (e.g., Message).
|
||||
*/
|
||||
private String routingExpression;
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public void setConfiguration(Map<String, FunctionConfigurationProperties> configuration) {
|
||||
for (Entry<String, FunctionConfigurationProperties> entry : configuration.entrySet()) {
|
||||
|
||||
@@ -254,6 +254,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
if (!names.contains(functionDefinition)) {
|
||||
List<String> eligibleFunction = names.stream()
|
||||
.filter(name -> !RoutingFunction.FUNCTION_NAME.equals(name))
|
||||
.filter(name -> !RoutingFunction.DEFAULT_ROUTE_HANDLER.equals(name))
|
||||
.collect(Collectors.toList());
|
||||
if (eligibleFunction.size() == 1
|
||||
&& !eligibleFunction.get(0).equals(functionDefinition)
|
||||
|
||||
@@ -35,8 +35,10 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.function.cloudevent.CloudEventsFunctionInvocationHelper;
|
||||
import org.springframework.cloud.function.context.DefaultMessageRoutingHandler;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.context.FunctionProperties;
|
||||
import org.springframework.cloud.function.context.FunctionRegistration;
|
||||
import org.springframework.cloud.function.context.FunctionRegistry;
|
||||
import org.springframework.cloud.function.context.MessageRoutingCallback;
|
||||
import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry;
|
||||
@@ -56,6 +58,7 @@ import org.springframework.context.annotation.ComponentScan.Filter;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.FilterType;
|
||||
import org.springframework.context.expression.BeanFactoryResolver;
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.convert.converter.GenericConverter;
|
||||
import org.springframework.core.convert.support.ConfigurableConversionService;
|
||||
import org.springframework.core.convert.support.DefaultConversionService;
|
||||
@@ -137,9 +140,16 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
return new BeanFactoryAwareFunctionRegistry(conversionService, messageConverter, jsonMapper, functionProperties, functionInvocationHelper);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Bean(RoutingFunction.FUNCTION_NAME)
|
||||
public RoutingFunction functionRouter(FunctionCatalog functionCatalog, FunctionProperties functionProperties,
|
||||
BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) {
|
||||
BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback,
|
||||
@Nullable DefaultMessageRoutingHandler defaultMessageRoutingHandler) {
|
||||
if (defaultMessageRoutingHandler != null) {
|
||||
FunctionRegistration functionRegistration = new FunctionRegistration(defaultMessageRoutingHandler, RoutingFunction.DEFAULT_ROUTE_HANDLER);
|
||||
functionRegistration.type(ResolvableType.forClassWithGenerics(Consumer.class, ResolvableType.forClassWithGenerics(Message.class, Object.class)).getType());
|
||||
((FunctionRegistry) functionCatalog).register(functionRegistration);
|
||||
}
|
||||
return new RoutingFunction(functionCatalog, functionProperties, new BeanFactoryResolver(beanFactory), routingCallback);
|
||||
}
|
||||
|
||||
|
||||
@@ -58,6 +58,11 @@ public class RoutingFunction implements Function<Object, Object> {
|
||||
*/
|
||||
public static final String FUNCTION_NAME = "functionRouter";
|
||||
|
||||
/**
|
||||
* The name of this function for routing of un-routable messages.
|
||||
*/
|
||||
public static final String DEFAULT_ROUTE_HANDLER = "defaultMessageRoutingHandler";
|
||||
|
||||
private static Log logger = LogFactory.getLog(RoutingFunction.class);
|
||||
|
||||
private final StandardEvaluationContext evalContext = new StandardEvaluationContext();
|
||||
@@ -82,13 +87,6 @@ public class RoutingFunction implements Function<Object, Object> {
|
||||
this(functionCatalog, extractIntoFunctionProperties(propertiesMap), beanResolver, routingCallback);
|
||||
}
|
||||
|
||||
private static FunctionProperties extractIntoFunctionProperties(Map<String, String> propertiesMap) {
|
||||
FunctionProperties functionProperties = new FunctionProperties();
|
||||
functionProperties.setDefinition(propertiesMap.get(FunctionProperties.FUNCTION_DEFINITION));
|
||||
functionProperties.setRoutingExpression(propertiesMap.get(FunctionProperties.PREFIX + ".routing-expression"));
|
||||
return functionProperties;
|
||||
}
|
||||
|
||||
public RoutingFunction(FunctionCatalog functionCatalog, FunctionProperties functionProperties,
|
||||
BeanResolver beanResolver, MessageRoutingCallback routingCallback) {
|
||||
this.functionCatalog = functionCatalog;
|
||||
@@ -98,6 +96,13 @@ public class RoutingFunction implements Function<Object, Object> {
|
||||
evalContext.setBeanResolver(beanResolver);
|
||||
}
|
||||
|
||||
private static FunctionProperties extractIntoFunctionProperties(Map<String, String> propertiesMap) {
|
||||
FunctionProperties functionProperties = new FunctionProperties();
|
||||
functionProperties.setDefinition(propertiesMap.get(FunctionProperties.FUNCTION_DEFINITION));
|
||||
functionProperties.setRoutingExpression(propertiesMap.get(FunctionProperties.ROUTING_EXPRESSION));
|
||||
return functionProperties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object apply(Object input) {
|
||||
return this.route(input, input instanceof Publisher);
|
||||
@@ -134,14 +139,14 @@ public class RoutingFunction implements Function<Object, Object> {
|
||||
}
|
||||
}
|
||||
if (function == null) {
|
||||
if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.definition"))) {
|
||||
function = functionFromDefinition((String) message.getHeaders().get("spring.cloud.function.definition"));
|
||||
if (StringUtils.hasText((String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION))) {
|
||||
function = functionFromDefinition((String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION));
|
||||
if (function.isInputTypePublisher()) {
|
||||
this.assertOriginalInputIsNotPublisher(originalInputIsPublisher);
|
||||
}
|
||||
}
|
||||
else if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.routing-expression"))) {
|
||||
function = this.functionFromExpression((String) message.getHeaders().get("spring.cloud.function.routing-expression"), message, true);
|
||||
else if (StringUtils.hasText((String) message.getHeaders().get(FunctionProperties.ROUTING_EXPRESSION))) {
|
||||
function = this.functionFromExpression((String) message.getHeaders().get(FunctionProperties.ROUTING_EXPRESSION), message, true);
|
||||
if (function.isInputTypePublisher()) {
|
||||
this.assertOriginalInputIsNotPublisher(originalInputIsPublisher);
|
||||
}
|
||||
@@ -198,7 +203,7 @@ public class RoutingFunction implements Function<Object, Object> {
|
||||
}
|
||||
|
||||
private FunctionInvocationWrapper functionFromDefinition(String definition) {
|
||||
FunctionInvocationWrapper function = functionCatalog.lookup(definition);
|
||||
FunctionInvocationWrapper function = this.resolveFunction(definition);
|
||||
Assert.notNull(function, "Failed to lookup function to route based on the value of 'spring.cloud.function.definition' property '"
|
||||
+ functionProperties.getDefinition() + "'");
|
||||
if (logger.isInfoEnabled()) {
|
||||
@@ -217,14 +222,23 @@ public class RoutingFunction implements Function<Object, Object> {
|
||||
input = MessageUtils.toCaseInsensitiveHeadersStructure((Message<?>) input);
|
||||
}
|
||||
|
||||
String functionName = isViaHeader ? expression.getValue(this.headerEvalContext, input, String.class) : expression.getValue(this.evalContext, input, String.class);
|
||||
Assert.hasText(functionName, "Failed to resolve function name based on routing expression '" + functionProperties.getRoutingExpression() + "'");
|
||||
FunctionInvocationWrapper function = functionCatalog.lookup(functionName);
|
||||
String definition = isViaHeader ? expression.getValue(this.headerEvalContext, input, String.class) : expression.getValue(this.evalContext, input, String.class);
|
||||
Assert.hasText(definition, "Failed to resolve function name based on routing expression '" + functionProperties.getRoutingExpression() + "'");
|
||||
FunctionInvocationWrapper function = this.resolveFunction(definition);
|
||||
Assert.notNull(function, "Failed to lookup function to route to based on the expression '"
|
||||
+ functionProperties.getRoutingExpression() + "' which resolved to '" + functionName + "' function name.");
|
||||
+ functionProperties.getRoutingExpression() + "' which resolved to '" + definition + "' function definition.");
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Resolved function from provided [routing-expression] " + routingExpression);
|
||||
}
|
||||
return function;
|
||||
}
|
||||
|
||||
private FunctionInvocationWrapper resolveFunction(String definition) {
|
||||
FunctionInvocationWrapper function = functionCatalog.lookup(definition);
|
||||
if (function == null) {
|
||||
function = functionCatalog.lookup(RoutingFunction.DEFAULT_ROUTE_HANDLER);
|
||||
}
|
||||
return function;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@ import reactor.test.StepVerifier;
|
||||
import org.springframework.beans.factory.BeanFactory;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.cloud.function.context.DefaultMessageRoutingHandler;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.context.FunctionProperties;
|
||||
import org.springframework.cloud.function.context.MessageRoutingCallback;
|
||||
@@ -70,6 +71,31 @@ public class RoutingFunctionTests {
|
||||
return configureCatalog(RoutingFunctionConfiguration.class);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Test
|
||||
public void testDefaultRouting() {
|
||||
Message<String> message = MessageBuilder.withPayload("hello")
|
||||
.setHeader(FunctionProperties.PREFIX + ".definition", "blah").build();
|
||||
|
||||
FunctionCatalog functionCatalog = this.configureCatalog(EmptyConfiguration.class);
|
||||
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
|
||||
assertThat(function).isNotNull();
|
||||
try {
|
||||
function.apply(message);
|
||||
fail();
|
||||
}
|
||||
catch (Exception e) {
|
||||
// Good
|
||||
}
|
||||
//
|
||||
functionCatalog = this.configureCatalog(ConfigurationWithDefaultMessageRoutingHandler.class);
|
||||
function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
|
||||
assertThat(function).isNotNull();
|
||||
function.apply(message);
|
||||
ConfigurationWithDefaultMessageRoutingHandler config = this.context.getBean(ConfigurationWithDefaultMessageRoutingHandler.class);
|
||||
assertThat(config.defaultHandlerInvoked).isTrue();
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Test
|
||||
public void testInvocationWithMessageAndHeader() {
|
||||
@@ -281,4 +307,25 @@ public class RoutingFunctionTests {
|
||||
return String::toUpperCase;
|
||||
}
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
protected static class EmptyConfiguration {
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
protected static class ConfigurationWithDefaultMessageRoutingHandler {
|
||||
public boolean defaultHandlerInvoked;
|
||||
@Bean
|
||||
public DefaultMessageRoutingHandler defaultRoutingHandler() {
|
||||
return new DefaultMessageRoutingHandler() {
|
||||
@Override
|
||||
public void accept(Message<?> message) {
|
||||
super.accept(message);
|
||||
defaultHandlerInvoked = true;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user