From 29115aeb643af8bfc3fdecd2bdc0910730d31032 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 22 Feb 2021 18:57:07 +0100 Subject: [PATCH] GH-654 Initial support for non-SpEL based routing mechanism --- .../context/MessageRoutingCallback.java | 39 ++++++++ ...ntextFunctionCatalogAutoConfiguration.java | 5 +- .../context/config/RoutingFunction.java | 98 ++++++++++++------- .../FunctionRSocketMessageHandler.java | 7 +- .../rsocket/FunctionRSocketUtils.java | 3 +- .../RSocketAutoConfigurationRoutingTests.java | 56 +++++++++++ 6 files changed, 171 insertions(+), 37 deletions(-) create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/MessageRoutingCallback.java diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/MessageRoutingCallback.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/MessageRoutingCallback.java new file mode 100644 index 000000000..75aa71bb0 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/MessageRoutingCallback.java @@ -0,0 +1,39 @@ +/* + * Copyright 2021-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context; + +import org.reactivestreams.Publisher; + +import org.springframework.messaging.Message; + +/** + * + * @author Oleg Zhurakousky + * @since 3.1 + */ +public interface MessageRoutingCallback { + + default String route(Message message, FunctionProperties functionProperties) { + // noop + return null; + } + + default String route(Publisher publisher, FunctionProperties functionProperties) { + //noop + return null; + } +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index 00cc54cdd..2e778e185 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -34,6 +34,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionRegistry; +import org.springframework.cloud.function.context.MessageRoutingCallback; import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry; import org.springframework.cloud.function.core.FunctionInvocationHelper; import org.springframework.cloud.function.json.GsonMapper; @@ -116,8 +117,8 @@ public class ContextFunctionCatalogAutoConfiguration { @Bean(RoutingFunction.FUNCTION_NAME) RoutingFunction functionRouter(FunctionCatalog functionCatalog, FunctionProperties functionProperties, - BeanFactory beanFactory) { - return new RoutingFunction(functionCatalog, functionProperties, new BeanFactoryResolver(beanFactory)); + BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) { + return new RoutingFunction(functionCatalog, functionProperties, new BeanFactoryResolver(beanFactory), routingCallback); } private boolean isConverterEligible(Object messageConverter) { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java index 7c8b77b31..319d89c42 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java @@ -26,6 +26,7 @@ import reactor.core.publisher.Mono; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; +import org.springframework.cloud.function.context.MessageRoutingCallback; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; import org.springframework.context.expression.MapAccessor; import org.springframework.expression.BeanResolver; @@ -63,14 +64,17 @@ public class RoutingFunction implements Function { private final FunctionProperties functionProperties; + private final MessageRoutingCallback routingCallback; + public RoutingFunction(FunctionCatalog functionCatalog, FunctionProperties functionProperties) { - this(functionCatalog, functionProperties, null); + this(functionCatalog, functionProperties, null, null); } public RoutingFunction(FunctionCatalog functionCatalog, FunctionProperties functionProperties, - BeanResolver beanResolver) { + BeanResolver beanResolver, MessageRoutingCallback routingCallback) { this.functionCatalog = functionCatalog; this.functionProperties = functionProperties; + this.routingCallback = routingCallback; this.evalContext.addPropertyAccessor(new MapAccessor()); evalContext.setBeanResolver(beanResolver); } @@ -80,6 +84,7 @@ public class RoutingFunction implements Function { return this.route(input, input instanceof Publisher); } + /* * - Check if spring.cloud.function.definition is set in header and if it is use it. * If NOT @@ -90,45 +95,56 @@ public class RoutingFunction implements Function { * - Fail */ private Object route(Object input, boolean originalInputIsPublisher) { - FunctionInvocationWrapper function; + FunctionInvocationWrapper function = null; if (input instanceof Message) { Message message = (Message) input; - if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.definition"))) { - function = functionFromDefinition((String) message.getHeaders().get("spring.cloud.function.definition")); - if (function.isInputTypePublisher()) { - this.assertOriginalInputIsNotPublisher(originalInputIsPublisher); + + if (this.routingCallback != null) { + function = this.functionFromCallback(message); + } + if (function == null) { + if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.definition"))) { + function = functionFromDefinition((String) message.getHeaders().get("spring.cloud.function.definition")); + if (function.isInputTypePublisher()) { + this.assertOriginalInputIsNotPublisher(originalInputIsPublisher); + } } - } - else if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.routing-expression"))) { - function = this.functionFromExpression((String) message.getHeaders().get("spring.cloud.function.routing-expression"), message); - if (function.isInputTypePublisher()) { - this.assertOriginalInputIsNotPublisher(originalInputIsPublisher); + else if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.routing-expression"))) { + function = this.functionFromExpression((String) message.getHeaders().get("spring.cloud.function.routing-expression"), message); + if (function.isInputTypePublisher()) { + this.assertOriginalInputIsNotPublisher(originalInputIsPublisher); + } + } + else if (StringUtils.hasText(functionProperties.getRoutingExpression())) { + function = this.functionFromExpression(functionProperties.getRoutingExpression(), message); + } + else if (StringUtils.hasText(functionProperties.getDefinition())) { + function = this.functionFromDefinition(functionProperties.getDefinition()); + } + else { + throw new IllegalStateException("Failed to establish route, since neither were provided: " + + "'spring.cloud.function.definition' as Message header or as application property or " + + "'spring.cloud.function.routing-expression' as application property."); } - } - else if (StringUtils.hasText(functionProperties.getRoutingExpression())) { - function = this.functionFromExpression(functionProperties.getRoutingExpression(), message); - } - else if (StringUtils.hasText(functionProperties.getDefinition())) { - function = functionFromDefinition(functionProperties.getDefinition()); - } - else { - throw new IllegalStateException("Failed to establish route, since neither were provided: " - + "'spring.cloud.function.definition' as Message header or as application property or " - + "'spring.cloud.function.routing-expression' as application property."); } } else if (input instanceof Publisher) { - if (StringUtils.hasText(functionProperties.getRoutingExpression())) { - function = this.functionFromExpression(functionProperties.getRoutingExpression(), input); + if (this.routingCallback != null) { + function = this.functionFromCallback(input); } - else - if (StringUtils.hasText(functionProperties.getDefinition())) { - function = functionFromDefinition(functionProperties.getDefinition()); - } - else { - return input instanceof Mono - ? Mono.from((Publisher) input).map(v -> route(v, originalInputIsPublisher)) - : Flux.from((Publisher) input).map(v -> route(v, originalInputIsPublisher)); + if (function == null) { + if (StringUtils.hasText(functionProperties.getRoutingExpression())) { + function = this.functionFromExpression(functionProperties.getRoutingExpression(), input); + } + else + if (StringUtils.hasText(functionProperties.getDefinition())) { + function = functionFromDefinition(functionProperties.getDefinition()); + } + else { + return input instanceof Mono + ? Mono.from((Publisher) input).map(v -> route(v, originalInputIsPublisher)) + : Flux.from((Publisher) input).map(v -> route(v, originalInputIsPublisher)); + } } } else { @@ -156,6 +172,22 @@ public class RoutingFunction implements Function { + "spring.cloud.function.routing-expression' as application properties."); } + private FunctionInvocationWrapper functionFromCallback(Object input) { + if (input instanceof Message) { + String functionDefinition = this.routingCallback.route((Message) input, this.functionProperties); + if (StringUtils.hasText(functionDefinition)) { + return this.functionFromDefinition(functionDefinition); + } + } + else { + String functionDefinition = this.routingCallback.route((Publisher) input, this.functionProperties); + if (StringUtils.hasText(functionDefinition)) { + return this.functionFromDefinition(functionDefinition); + } + } + return null; + } + private FunctionInvocationWrapper functionFromDefinition(String definition) { FunctionInvocationWrapper function = functionCatalog.lookup(definition); Assert.notNull(function, "Failed to lookup function to route based on the value of 'spring.cloud.function.definition' property '" diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java index a9d0502ad..0509f7e87 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java @@ -32,6 +32,7 @@ import reactor.core.publisher.Mono; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; +import org.springframework.cloud.function.context.MessageRoutingCallback; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; import org.springframework.cloud.function.context.config.RoutingFunction; import org.springframework.core.MethodParameter; @@ -59,6 +60,7 @@ import org.springframework.messaging.rsocket.annotation.support.RSocketFrameType import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import org.springframework.messaging.rsocket.annotation.support.RSocketPayloadReturnValueHandler; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.CollectionUtils; import org.springframework.util.MimeTypeUtils; import org.springframework.util.ReflectionUtils; import org.springframework.util.RouteMatcher; @@ -179,7 +181,10 @@ class FunctionRSocketMessageHandler extends RSocketMessageHandler { private String discoverAndInjectDestinationHeader(Message message) { String destination; - if (StringUtils.hasText(this.functionProperties.getRoutingExpression())) { + if (!CollectionUtils.isEmpty(this.getApplicationContext().getBeansOfType(MessageRoutingCallback.class))) { + destination = RoutingFunction.FUNCTION_NAME; + } + else if (StringUtils.hasText(this.functionProperties.getRoutingExpression())) { destination = RoutingFunction.FUNCTION_NAME; this.updateMessageHeaders(message, destination); } diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketUtils.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketUtils.java index 71359cc4e..712de89d9 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketUtils.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketUtils.java @@ -78,7 +78,8 @@ final class FunctionRSocketUtils { if (functionCatalog.lookup(name) == null) { // this means RSocket String[] functionToRSocketDefinition = StringUtils.delimitedListToStringArray(name, ">"); if (functionToRSocketDefinition.length == 1) { - throw new IllegalArgumentException("Function definition '" + name + "' does not exist in Function Catalog"); + //throw new IllegalArgumentException("Function definition '" + name + "' does not exist in Function Catalog"); + return; } if (LOGGER.isDebugEnabled()) { LOGGER.debug("Registering RSocket forwarder for '" + name + "' function."); diff --git a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java index 5d25c05fc..aa13b308f 100644 --- a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java +++ b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java @@ -29,6 +29,8 @@ import reactor.test.StepVerifier; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.function.context.FunctionProperties; +import org.springframework.cloud.function.context.MessageRoutingCallback; import org.springframework.cloud.function.context.config.RoutingFunction; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; @@ -173,6 +175,59 @@ public class RSocketAutoConfigurationRoutingTests { } } + @Test + public void testRoutingWithRoutingCallback() { + int port = SocketUtils.findAvailableTcpPort(); + try ( + ConfigurableApplicationContext applicationContext = + new SpringApplicationBuilder(RoutingCallbackFunctionConfiguration.class) + .web(WebApplicationType.NONE) + .run("--logging.level.org.springframework.cloud.function=DEBUG", + "--spring.cloud.function.expected-content-type=text/plain", + "--spring.rsocket.server.port=" + port); + ) { + RSocketRequester.Builder rsocketRequesterBuilder = + applicationContext.getBean(RSocketRequester.Builder.class); + + rsocketRequesterBuilder.tcp("localhost", port) + .route("foo") + .metadata("{\"func_name\":\"uppercase\"}", MimeTypeUtils.APPLICATION_JSON) + .data("hello") + .retrieveMono(String.class) + .as(StepVerifier::create) + .expectNext("HELLO") + .expectComplete() + .verify(); + + } + } + + + + @EnableAutoConfiguration + @Configuration + public static class RoutingCallbackFunctionConfiguration { + @Bean + public MessageRoutingCallback customRouter() { + return new MessageRoutingCallback() { + @Override + public String route(Message message, FunctionProperties functionProperties) { + return (String) message.getHeaders().get("func_name"); + } + }; + } + + @Bean + public Function uppercase() { + return v -> v.toUpperCase(); + } + + @Bean + public Function concat() { + return v -> v + v; + } + } + @EnableAutoConfiguration @Configuration public static class SampleFunctionConfiguration { @@ -223,6 +278,7 @@ public class RSocketAutoConfigurationRoutingTests { return () -> "test data"; } + } }