diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/MessageRoutingCallback.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/MessageRoutingCallback.java index 137d2b859..d5229471c 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/MessageRoutingCallback.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/MessageRoutingCallback.java @@ -16,14 +16,29 @@ package org.springframework.cloud.function.context; +import org.springframework.cloud.function.context.config.RoutingFunction; import org.springframework.messaging.Message; /** + * Java-based strategy to assist with determining the name of the route-to function definition. + * Once implementation is registered as a bean in application context + * it will be picked up by a {@link RoutingFunction} and used to determine the name of the + * route-to function definition. + * + * While {@link RoutingFunction} provides several mechanisms to determine the route-to function definition + * this callback takes precedence over all of them. * * @author Oleg Zhurakousky * @since 3.1 */ public interface MessageRoutingCallback { - String route(Message message, FunctionProperties functionProperties); + /** + * Determines the name of the function definition to route incoming {@link Message}. + * + * @param message instance of incoming {@link Message} + * @param functionProperties instance of {@link FunctionProperties} + * @return the name of the route-to function definition + */ + String functionDefinition(Message message, FunctionProperties functionProperties); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java index d98929f1b..422327a8e 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java @@ -86,19 +86,23 @@ public class RoutingFunction implements Function { /* - * - Check if spring.cloud.function.definition is set in header and if it is use it. + * - Check if `this.routingCallback` is present and if it is use it (only for Message input) * If NOT - * - Check spring.cloud.function.routing-expression and if it is set use it + * - Check if spring.cloud.function.definition is set in header and if it is use it.(only for Message input) * If NOT - * - Check spring.cloud.function.definition is set in FunctionProperties and if it is use it + * - Check if spring.cloud.function.routing-expression is set in header and if it is set use it (only for Message input) + * If NOT + * - Check `spring.cloud.function.definition` is set in FunctionProperties and if it is use it (Message and Publisher) + * If NOT + * - Check `spring.cloud.function.routing-expression` is set in FunctionProperties and if it is use it (Message and Publisher) * If NOT * - Fail */ private Object route(Object input, boolean originalInputIsPublisher) { FunctionInvocationWrapper function = null; + if (input instanceof Message) { Message message = (Message) input; - if (this.routingCallback != null) { function = this.functionFromCallback(message); } @@ -129,17 +133,13 @@ public class RoutingFunction implements Function { } } else if (input instanceof Publisher) { - if (this.routingCallback != null) { - function = this.functionFromCallback(input); - } if (function == null) { - if (StringUtils.hasText(functionProperties.getRoutingExpression())) { - function = this.functionFromExpression(functionProperties.getRoutingExpression(), input); - } - else if (StringUtils.hasText(functionProperties.getDefinition())) { function = functionFromDefinition(functionProperties.getDefinition()); } + else if (StringUtils.hasText(functionProperties.getRoutingExpression())) { + function = this.functionFromExpression(functionProperties.getRoutingExpression(), input); + } else { return input instanceof Mono ? Mono.from((Publisher) input).map(v -> route(v, originalInputIsPublisher)) @@ -174,7 +174,7 @@ public class RoutingFunction implements Function { private FunctionInvocationWrapper functionFromCallback(Object input) { if (input instanceof Message) { - String functionDefinition = this.routingCallback.route((Message) input, this.functionProperties); + String functionDefinition = this.routingCallback.functionDefinition((Message) input, this.functionProperties); if (StringUtils.hasText(functionDefinition)) { return this.functionFromDefinition(functionDefinition); } diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java index 3787f740e..075776afa 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java @@ -113,6 +113,9 @@ class RSocketListenerFunction implements Function>, Publish : Mono.just((Message) result); }); } + /* + * THis is wrong as we're effectively not letting user to see any metadat that may have been comunicated + */ return dataFlux.cast(Message.class).map(Message::getPayload); } } diff --git a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/MessageRoutingCallbackRSocketTests.java b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/MessageRoutingCallbackRSocketTests.java new file mode 100644 index 000000000..697c99880 --- /dev/null +++ b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/MessageRoutingCallbackRSocketTests.java @@ -0,0 +1,144 @@ +/* + * Copyright 2021-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.rsocket; + +import java.util.function.Function; + +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.function.context.FunctionProperties; +import org.springframework.cloud.function.context.MessageRoutingCallback; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.Message; +import org.springframework.messaging.rsocket.RSocketRequester; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.MimeTypeUtils; +import org.springframework.util.SocketUtils; + +/** + * + * @author Oleg Zhurakousky + * + */ +public class MessageRoutingCallbackRSocketTests { + + @Test + public void testRoutingWithRoutingCallback() { + int port = SocketUtils.findAvailableTcpPort(); + try ( + ConfigurableApplicationContext applicationContext = + new SpringApplicationBuilder(RoutingCallbackFunctionConfiguration.class) + .web(WebApplicationType.NONE) + .run("--logging.level.org.springframework.cloud.function=DEBUG", + "--spring.cloud.function.expected-content-type=text/plain", + "--spring.rsocket.server.port=" + port); + ) { + RSocketRequester.Builder rsocketRequesterBuilder = + applicationContext.getBean(RSocketRequester.Builder.class); + + // imperative + rsocketRequesterBuilder.tcp("localhost", port) + .route("foo") + .metadata("{\"func_name\":\"uppercase\"}", MimeTypeUtils.APPLICATION_JSON) + .data("hello") + .retrieveMono(String.class) + .as(StepVerifier::create) + .expectNext("HELLO") + .expectComplete() + .verify(); + + // imperative Message + rsocketRequesterBuilder.tcp("localhost", port) + .route("foo") + .metadata("{\"func_name\":\"uppercaseMessage\"}", MimeTypeUtils.APPLICATION_JSON) + .data("hello") + .retrieveMono(String.class) + .as(StepVerifier::create) + .expectNext("HELLO") + .expectComplete() + .verify(); + + // reactive + rsocketRequesterBuilder.tcp("localhost", port) + .route("foo") + .metadata("{\"func_name\":\"uppercaseReactive\"}", MimeTypeUtils.APPLICATION_JSON) + .data("hello") + .retrieveMono(String.class) + .as(StepVerifier::create) + .expectNext("HELLO") + .expectComplete() + .verify(); + + // reactive + rsocketRequesterBuilder.tcp("localhost", port) + .route("foo") + .metadata("{\"func_name\":\"uppercaseReactiveMessage\"}", MimeTypeUtils.APPLICATION_JSON) + .data("hello") + .retrieveMono(String.class) + .as(StepVerifier::create) + .expectNext("HELLO") + .expectComplete() + .verify(); + } + } + + @EnableAutoConfiguration + @Configuration + public static class RoutingCallbackFunctionConfiguration { + @Bean + public MessageRoutingCallback customRouter() { + return new MessageRoutingCallback() { + @Override + public String functionDefinition(Message message, FunctionProperties functionProperties) { + return (String) message.getHeaders().get("func_name"); + } + }; + } + + @Bean + public Function uppercase() { + return v -> v.toUpperCase(); + } + + @Bean + public Function, Message> uppercaseMessage() { + return m -> MessageBuilder.withPayload(m.getPayload().toUpperCase()).copyHeaders(m.getHeaders()).build(); + } + + @Bean + public Function, Flux> uppercaseReactive() { + return flux -> flux.map(v -> v.toUpperCase()); + } + + @Bean + public Function>, Flux>> uppercaseReactiveMessage() { + return flux -> flux.map(m -> MessageBuilder.withPayload(m.getPayload().toUpperCase()).copyHeaders(m.getHeaders()).build()); + } + + @Bean + public Function concat() { + return v -> v + v; + } + } +} diff --git a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java index aa13b308f..ee43fe03a 100644 --- a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java +++ b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java @@ -29,8 +29,6 @@ import reactor.test.StepVerifier; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; -import org.springframework.cloud.function.context.FunctionProperties; -import org.springframework.cloud.function.context.MessageRoutingCallback; import org.springframework.cloud.function.context.config.RoutingFunction; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; @@ -175,59 +173,6 @@ public class RSocketAutoConfigurationRoutingTests { } } - @Test - public void testRoutingWithRoutingCallback() { - int port = SocketUtils.findAvailableTcpPort(); - try ( - ConfigurableApplicationContext applicationContext = - new SpringApplicationBuilder(RoutingCallbackFunctionConfiguration.class) - .web(WebApplicationType.NONE) - .run("--logging.level.org.springframework.cloud.function=DEBUG", - "--spring.cloud.function.expected-content-type=text/plain", - "--spring.rsocket.server.port=" + port); - ) { - RSocketRequester.Builder rsocketRequesterBuilder = - applicationContext.getBean(RSocketRequester.Builder.class); - - rsocketRequesterBuilder.tcp("localhost", port) - .route("foo") - .metadata("{\"func_name\":\"uppercase\"}", MimeTypeUtils.APPLICATION_JSON) - .data("hello") - .retrieveMono(String.class) - .as(StepVerifier::create) - .expectNext("HELLO") - .expectComplete() - .verify(); - - } - } - - - - @EnableAutoConfiguration - @Configuration - public static class RoutingCallbackFunctionConfiguration { - @Bean - public MessageRoutingCallback customRouter() { - return new MessageRoutingCallback() { - @Override - public String route(Message message, FunctionProperties functionProperties) { - return (String) message.getHeaders().get("func_name"); - } - }; - } - - @Bean - public Function uppercase() { - return v -> v.toUpperCase(); - } - - @Bean - public Function concat() { - return v -> v + v; - } - } - @EnableAutoConfiguration @Configuration public static class SampleFunctionConfiguration {