diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java index d7868ae4e..990b642dc 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java @@ -32,6 +32,7 @@ import reactor.core.publisher.Mono; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; +import org.springframework.cloud.function.context.config.RoutingFunction; import org.springframework.core.MethodParameter; import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ReactiveAdapterRegistry; @@ -154,6 +155,9 @@ class FunctionRSocketMessageHandler extends RSocketMessageHandler { @SuppressWarnings("unchecked") private String discoverAndInjectDestinationHeader(Message message) { String destination = this.functionProperties.getDefinition(); + if (!StringUtils.hasText(destination) && StringUtils.hasText(this.functionProperties.getRoutingExpression())) { + destination = RoutingFunction.FUNCTION_NAME; + } Map headersMap = (Map) ReflectionUtils .getField(this.headersField, message.getHeaders()); diff --git a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java index 88e45c13f..e37a9b900 100644 --- a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java +++ b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java @@ -58,7 +58,7 @@ public class RSocketAutoConfigurationRoutingTests { applicationContext.getBean(RSocketRequester.Builder.class); rsocketRequesterBuilder.tcp("localhost", port) - .route(RoutingFunction.FUNCTION_NAME) + .route("") .metadata("{\"func_name\":\"echo\"}", MimeTypeUtils.APPLICATION_JSON) .data("hello") .retrieveMono(String.class) @@ -68,14 +68,14 @@ public class RSocketAutoConfigurationRoutingTests { .verify(); rsocketRequesterBuilder.tcp("localhost", port) - .route(RoutingFunction.FUNCTION_NAME) - .metadata("{\"func_name\":\"uppercase\"}", MimeTypeUtils.APPLICATION_JSON) - .data("hello") - .retrieveMono(String.class) - .as(StepVerifier::create) - .expectNext("HELLO") - .expectComplete() - .verify(); + .route(RoutingFunction.FUNCTION_NAME) + .metadata("{\"func_name\":\"uppercase\"}", MimeTypeUtils.APPLICATION_JSON) + .data("hello") + .retrieveMono(String.class) + .as(StepVerifier::create) + .expectNext("HELLO") + .expectComplete() + .verify(); } } diff --git a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java index 8fa77132c..0a1531050 100644 --- a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java +++ b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java @@ -122,6 +122,31 @@ public class RSocketAutoConfigurationTests { } } + @Test + public void testWithRouteAndDefinition() { + int port = SocketUtils.findAvailableTcpPort(); + try ( + ConfigurableApplicationContext applicationContext = + new SpringApplicationBuilder(SampleFunctionConfiguration.class) + .web(WebApplicationType.NONE) + .run("--logging.level.org.springframework.cloud.function=DEBUG", + "--spring.cloud.function.definition=echo", + "--spring.rsocket.server.port=" + port); + ) { + RSocketRequester.Builder rsocketRequesterBuilder = + applicationContext.getBean(RSocketRequester.Builder.class); + + rsocketRequesterBuilder.tcp("localhost", port) + .route("uppercase") + .data("hello") + .retrieveMono(String.class) + .as(StepVerifier::create) + .expectNext("HELLO") + .expectComplete() + .verify(); + } + } + @Test public void testImperativeFunctionAsRequestReplyWithComposition() { int port = SocketUtils.findAvailableTcpPort();