diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java index fb3572ec5..0ac10a729 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java @@ -51,7 +51,7 @@ import org.springframework.util.StringUtils; @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties({ FunctionProperties.class, RSocketFunctionProperties.class }) @ConditionalOnProperty(name = FunctionProperties.PREFIX + ".rsocket.enabled", matchIfMissing = true) -public class RSocketAutoConfiguration { +class RSocketAutoConfiguration { private static Log logger = LogFactory.getLog(RSocketAutoConfiguration.class); diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java index 10ea5ee79..e52a1cba3 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java @@ -44,7 +44,7 @@ import org.springframework.messaging.support.MessageBuilder; * @author Oleg Zhurakousky * @since 3.1 */ -public class RSocketListenerFunction implements Function, Publisher>> { +class RSocketListenerFunction implements Function, Publisher>> { private static String splash = " ____ _ _______ __ ____ __ _ ___ ____ __ __ \n" + " / __/__ ____(_)__ ___ _ / ___/ /__ __ _____/ / / __/_ _____ ____/ /_(_)__ ___ / _ \\/ __/__ ____/ /_____ / /_\n" + @@ -86,8 +86,8 @@ public class RSocketListenerFunction implements Function, Publis } if (this.listenAddress != null) { this.rsocketConnection = RSocketConnectionUtils.createServerSocket(rsocket, this.listenAddress); - this.printSplashScreen(this.targetFunction.getFunctionDefinition(), functionType); } + this.printSplashScreen(this.targetFunction.getFunctionDefinition(), functionType); } void stop() { @@ -191,7 +191,6 @@ public class RSocketListenerFunction implements Function, Publis private void printSplashScreen(String definition, Type type) { System.out.println(splash); System.out.println("Function Definition: " + definition + "; T[" + type + "]"); - System.out.println("RSocket Listen Address: " + this.listenAddress); System.out.println("======================================================\n"); } diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketRoutingAutoConfiguration.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketRoutingAutoConfiguration.java index cf7fc5dc5..db0131cd0 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketRoutingAutoConfiguration.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketRoutingAutoConfiguration.java @@ -41,7 +41,7 @@ import org.springframework.messaging.rsocket.RSocketConnectorConfigurer; @ConditionalOnProperty(name = FunctionProperties.PREFIX + ".rsocket.enabled", matchIfMissing = true) @AutoConfigureBefore(RoutingClientAutoConfiguration.class) @AutoConfigureAfter(RSocketAutoConfiguration.class) -public class RSocketRoutingAutoConfiguration { +class RSocketRoutingAutoConfiguration { @Bean public RSocketConnectorConfigurer functionRSocketConnectorConfigurer( diff --git a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RoutingBrokerTests.java b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RoutingBrokerTests.java index 2bc9ef5b1..90e907bc5 100644 --- a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RoutingBrokerTests.java +++ b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RoutingBrokerTests.java @@ -16,12 +16,11 @@ package org.springframework.cloud.function.rsocket; -import java.util.function.Consumer; import java.util.function.Function; import io.rsocket.routing.client.spring.RoutingMetadata; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -36,80 +35,91 @@ import org.springframework.util.SocketUtils; /** - * + * @author Spencer Gibb * @author Oleg Zhurakousky * @since 3.1 */ public class RoutingBrokerTests { + ConfigurableApplicationContext functionContext; + ConfigurableApplicationContext brokerContext; + ConfigurableApplicationContext clientContext; + + @AfterEach + public void cleanup() { + if (functionContext != null) { + functionContext.close(); + } + if (brokerContext != null) { + brokerContext.close(); + } + if (clientContext != null) { + clientContext.close(); + } + } + @Test - public void testImperativeFunctionAsRequestReply() throws Exception { + public void testRoutingWithProperty() throws Exception { + this.setup(true); + RSocketRequester requester = clientContext.getBean(RSocketRequester.class); + Mono result = requester.route("toupper") // used to find a messagemapping, so unused here + // auto creates metadata + .data("\"hello\"") + .retrieveMono(String.class); - ConfigurableApplicationContext functionContext = null; - ConfigurableApplicationContext brokerContext = null; - ConfigurableApplicationContext clientContext = null; - try { - int routingBrokerProxyPort = SocketUtils.findAvailableTcpPort(); - int routingBrokerClusterPort = SocketUtils.findAvailableTcpPort(); + StepVerifier + .create(result) + .expectNext("\"HELLO\"") + .expectComplete() + .verify(); + } - // start broker - brokerContext = new SpringApplicationBuilder(SimpleConfiguration.class).web(WebApplicationType.NONE).run( - "--logging.level.io.rsocket.routing.broker=TRACE", - "--spring.cloud.function.rsocket.enabled=false", - "--io.rsocket.routing.client.enabled=false", - "--io.rsocket.routing.broker.enabled=true", + @Test + public void testRoutingWithMessage() throws Exception { + this.setup(false); + RSocketRequester requester = clientContext.getBean(RSocketRequester.class); + RoutingMetadata metadata = clientContext.getBean(RoutingMetadata.class); + Mono result = requester.route("toupper") // used to find a messagemapping, so unused here + .metadata(metadata.address("samplefn")) + .data("\"hello\"") + .retrieveMono(String.class); + + StepVerifier + .create(result) + .expectNext("\"HELLO\"") + .expectComplete() + .verify(); + } + + private void setup(boolean routingWithProperty) { + int routingBrokerProxyPort = SocketUtils.findAvailableTcpPort(); + int routingBrokerClusterPort = SocketUtils.findAvailableTcpPort(); + // start broker + brokerContext = new SpringApplicationBuilder(SimpleConfiguration.class).web(WebApplicationType.NONE).run( + "--logging.level.io.rsocket.routing.broker=TRACE", "--spring.cloud.function.rsocket.enabled=false", + "--io.rsocket.routing.client.enabled=false", "--io.rsocket.routing.broker.enabled=true", "--io.rsocket.routing.broker.tcp.port=" + routingBrokerProxyPort, "--io.rsocket.routing.broker.cluster.port=" + routingBrokerClusterPort); - // start function connecting to broker, service-name=samplefn - functionContext = new SpringApplicationBuilder(SampleFunctionConfiguration.class) - .web(WebApplicationType.NONE).run( - "--logging.level.org.springframework.cloud.function=DEBUG", - "--io.rsocket.routing.client.enabled=true", - "--io.rsocket.routing.client.service-name=samplefn", - "--io.rsocket.routing.client.brokers[0].host=localhost", - "--io.rsocket.routing.client.brokers[0].port=" + routingBrokerProxyPort, - "--io.rsocket.routing.broker.enabled=false", - "--spring.cloud.function.definition=uppercase"); + // start function connecting to broker, service-name=samplefn + functionContext = new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE) + .run("--logging.level.org.springframework.cloud.function=DEBUG", + "--io.rsocket.routing.client.enabled=true", "--io.rsocket.routing.client.service-name=samplefn", + "--io.rsocket.routing.client.brokers[0].host=localhost", + "--io.rsocket.routing.client.brokers[0].port=" + routingBrokerProxyPort, + "--io.rsocket.routing.broker.enabled=false", "--spring.cloud.function.definition=uppercase"); - // start testclient connecting to broker, for RSocketRequester - clientContext = new SpringApplicationBuilder(SimpleConfiguration.class) - .web(WebApplicationType.NONE).run( - "--logging.level.io.rsocket.routing.client=TRACE", - "--spring.cloud.function.rsocket.enabled=false", - "--io.rsocket.routing.client.enabled=true", - "--io.rsocket.routing.client.service-name=testclient", - "--io.rsocket.routing.client.address.toupper.service_name=samplefn", - "--io.rsocket.routing.client.brokers[0].host=localhost", - "--io.rsocket.routing.client.brokers[0].port=" + routingBrokerProxyPort, - "--io.rsocket.routing.broker.enabled=false"); - - RSocketRequester requester = clientContext.getBean(RSocketRequester.class); - //RoutingMetadata metadata = clientContext.getBean(RoutingMetadata.class); - Mono result = requester.route("toupper") // used to find a messagemapping, so unused here - // auto creates metadata - //.metadata(metadata.address("samplefn")) - .data("\"hello\"") - .retrieveMono(String.class); - - StepVerifier - .create(result) - .expectNext("\"HELLO\"") - .expectComplete() - .verify(); - } finally { - if (functionContext != null) { - functionContext.close(); - } - if (brokerContext != null) { - brokerContext.close(); - } - if (clientContext != null) { - clientContext.close(); - } - } + // start testclient connecting to broker, for RSocketRequester + clientContext = new SpringApplicationBuilder(SimpleConfiguration.class).web(WebApplicationType.NONE).run( + "--logging.level.io.rsocket.routing.client=TRACE", "--spring.cloud.function.rsocket.enabled=false", + "--io.rsocket.routing.client.enabled=true", "--io.rsocket.routing.client.service-name=testclient", + routingWithProperty ? "--io.rsocket.routing.client.address.toupper.service_name=samplefn" : "", + "--io.rsocket.routing.client.brokers[0].host=localhost", + "--io.rsocket.routing.client.brokers[0].port=" + routingBrokerProxyPort, + "--io.rsocket.routing.broker.enabled=false"); } + @EnableAutoConfiguration @Configuration public static class SimpleConfiguration { @@ -125,50 +135,5 @@ public class RoutingBrokerTests { return v.toUpperCase(); }; } - - @Bean - public Function concat() { - return v -> { - return v + v; - }; - } - - @Bean - public Function echo() { - return v -> v; - } - - @Bean - public Function, Flux> uppercaseReactive() { - return flux -> flux.map(v -> { - System.out.println("Uppercasing: " + v); - return v.toUpperCase(); - }); - } - - @Bean - public Consumer log() { - return v -> { - System.out.println("==> In Consumer: " + new String(v)); - }; - } - } - - @EnableAutoConfiguration - @Configuration - public static class AdditionalFunctionConfiguration { - @Bean - public Function reverse() { - return v -> { - return new StringBuilder(v).reverse().toString(); - }; - } - - @Bean - public Function wrap() { - return v -> { - return "(" + v + ")"; - }; - } } }