Polish RoutingBrokerTests

This commit is contained in:
Oleg Zhurakousky
2020-08-25 21:20:11 +02:00
parent 87f71a55e5
commit 78de449d25
4 changed files with 76 additions and 112 deletions

View File

@@ -51,7 +51,7 @@ import org.springframework.util.StringUtils;
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties({ FunctionProperties.class, RSocketFunctionProperties.class })
@ConditionalOnProperty(name = FunctionProperties.PREFIX + ".rsocket.enabled", matchIfMissing = true)
public class RSocketAutoConfiguration {
class RSocketAutoConfiguration {
private static Log logger = LogFactory.getLog(RSocketAutoConfiguration.class);

View File

@@ -44,7 +44,7 @@ import org.springframework.messaging.support.MessageBuilder;
* @author Oleg Zhurakousky
* @since 3.1
*/
public class RSocketListenerFunction implements Function<Message<byte[]>, Publisher<Message<byte[]>>> {
class RSocketListenerFunction implements Function<Message<byte[]>, Publisher<Message<byte[]>>> {
private static String splash = " ____ _ _______ __ ____ __ _ ___ ____ __ __ \n" +
" / __/__ ____(_)__ ___ _ / ___/ /__ __ _____/ / / __/_ _____ ____/ /_(_)__ ___ / _ \\/ __/__ ____/ /_____ / /_\n" +
@@ -86,8 +86,8 @@ public class RSocketListenerFunction implements Function<Message<byte[]>, Publis
}
if (this.listenAddress != null) {
this.rsocketConnection = RSocketConnectionUtils.createServerSocket(rsocket, this.listenAddress);
this.printSplashScreen(this.targetFunction.getFunctionDefinition(), functionType);
}
this.printSplashScreen(this.targetFunction.getFunctionDefinition(), functionType);
}
void stop() {
@@ -191,7 +191,6 @@ public class RSocketListenerFunction implements Function<Message<byte[]>, Publis
private void printSplashScreen(String definition, Type type) {
System.out.println(splash);
System.out.println("Function Definition: " + definition + "; T[" + type + "]");
System.out.println("RSocket Listen Address: " + this.listenAddress);
System.out.println("======================================================\n");
}

View File

@@ -41,7 +41,7 @@ import org.springframework.messaging.rsocket.RSocketConnectorConfigurer;
@ConditionalOnProperty(name = FunctionProperties.PREFIX + ".rsocket.enabled", matchIfMissing = true)
@AutoConfigureBefore(RoutingClientAutoConfiguration.class)
@AutoConfigureAfter(RSocketAutoConfiguration.class)
public class RSocketRoutingAutoConfiguration {
class RSocketRoutingAutoConfiguration {
@Bean
public RSocketConnectorConfigurer functionRSocketConnectorConfigurer(

View File

@@ -16,12 +16,11 @@
package org.springframework.cloud.function.rsocket;
import java.util.function.Consumer;
import java.util.function.Function;
import io.rsocket.routing.client.spring.RoutingMetadata;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -36,80 +35,91 @@ import org.springframework.util.SocketUtils;
/**
*
* @author Spencer Gibb
* @author Oleg Zhurakousky
* @since 3.1
*/
public class RoutingBrokerTests {
ConfigurableApplicationContext functionContext;
ConfigurableApplicationContext brokerContext;
ConfigurableApplicationContext clientContext;
@AfterEach
public void cleanup() {
if (functionContext != null) {
functionContext.close();
}
if (brokerContext != null) {
brokerContext.close();
}
if (clientContext != null) {
clientContext.close();
}
}
@Test
public void testImperativeFunctionAsRequestReply() throws Exception {
public void testRoutingWithProperty() throws Exception {
this.setup(true);
RSocketRequester requester = clientContext.getBean(RSocketRequester.class);
Mono<String> result = requester.route("toupper") // used to find a messagemapping, so unused here
// auto creates metadata
.data("\"hello\"")
.retrieveMono(String.class);
ConfigurableApplicationContext functionContext = null;
ConfigurableApplicationContext brokerContext = null;
ConfigurableApplicationContext clientContext = null;
try {
int routingBrokerProxyPort = SocketUtils.findAvailableTcpPort();
int routingBrokerClusterPort = SocketUtils.findAvailableTcpPort();
StepVerifier
.create(result)
.expectNext("\"HELLO\"")
.expectComplete()
.verify();
}
// start broker
brokerContext = new SpringApplicationBuilder(SimpleConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.io.rsocket.routing.broker=TRACE",
"--spring.cloud.function.rsocket.enabled=false",
"--io.rsocket.routing.client.enabled=false",
"--io.rsocket.routing.broker.enabled=true",
@Test
public void testRoutingWithMessage() throws Exception {
this.setup(false);
RSocketRequester requester = clientContext.getBean(RSocketRequester.class);
RoutingMetadata metadata = clientContext.getBean(RoutingMetadata.class);
Mono<String> result = requester.route("toupper") // used to find a messagemapping, so unused here
.metadata(metadata.address("samplefn"))
.data("\"hello\"")
.retrieveMono(String.class);
StepVerifier
.create(result)
.expectNext("\"HELLO\"")
.expectComplete()
.verify();
}
private void setup(boolean routingWithProperty) {
int routingBrokerProxyPort = SocketUtils.findAvailableTcpPort();
int routingBrokerClusterPort = SocketUtils.findAvailableTcpPort();
// start broker
brokerContext = new SpringApplicationBuilder(SimpleConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.io.rsocket.routing.broker=TRACE", "--spring.cloud.function.rsocket.enabled=false",
"--io.rsocket.routing.client.enabled=false", "--io.rsocket.routing.broker.enabled=true",
"--io.rsocket.routing.broker.tcp.port=" + routingBrokerProxyPort,
"--io.rsocket.routing.broker.cluster.port=" + routingBrokerClusterPort);
// start function connecting to broker, service-name=samplefn
functionContext = new SpringApplicationBuilder(SampleFunctionConfiguration.class)
.web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--io.rsocket.routing.client.enabled=true",
"--io.rsocket.routing.client.service-name=samplefn",
"--io.rsocket.routing.client.brokers[0].host=localhost",
"--io.rsocket.routing.client.brokers[0].port=" + routingBrokerProxyPort,
"--io.rsocket.routing.broker.enabled=false",
"--spring.cloud.function.definition=uppercase");
// start function connecting to broker, service-name=samplefn
functionContext = new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE)
.run("--logging.level.org.springframework.cloud.function=DEBUG",
"--io.rsocket.routing.client.enabled=true", "--io.rsocket.routing.client.service-name=samplefn",
"--io.rsocket.routing.client.brokers[0].host=localhost",
"--io.rsocket.routing.client.brokers[0].port=" + routingBrokerProxyPort,
"--io.rsocket.routing.broker.enabled=false", "--spring.cloud.function.definition=uppercase");
// start testclient connecting to broker, for RSocketRequester
clientContext = new SpringApplicationBuilder(SimpleConfiguration.class)
.web(WebApplicationType.NONE).run(
"--logging.level.io.rsocket.routing.client=TRACE",
"--spring.cloud.function.rsocket.enabled=false",
"--io.rsocket.routing.client.enabled=true",
"--io.rsocket.routing.client.service-name=testclient",
"--io.rsocket.routing.client.address.toupper.service_name=samplefn",
"--io.rsocket.routing.client.brokers[0].host=localhost",
"--io.rsocket.routing.client.brokers[0].port=" + routingBrokerProxyPort,
"--io.rsocket.routing.broker.enabled=false");
RSocketRequester requester = clientContext.getBean(RSocketRequester.class);
//RoutingMetadata metadata = clientContext.getBean(RoutingMetadata.class);
Mono<String> result = requester.route("toupper") // used to find a messagemapping, so unused here
// auto creates metadata
//.metadata(metadata.address("samplefn"))
.data("\"hello\"")
.retrieveMono(String.class);
StepVerifier
.create(result)
.expectNext("\"HELLO\"")
.expectComplete()
.verify();
} finally {
if (functionContext != null) {
functionContext.close();
}
if (brokerContext != null) {
brokerContext.close();
}
if (clientContext != null) {
clientContext.close();
}
}
// start testclient connecting to broker, for RSocketRequester
clientContext = new SpringApplicationBuilder(SimpleConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.io.rsocket.routing.client=TRACE", "--spring.cloud.function.rsocket.enabled=false",
"--io.rsocket.routing.client.enabled=true", "--io.rsocket.routing.client.service-name=testclient",
routingWithProperty ? "--io.rsocket.routing.client.address.toupper.service_name=samplefn" : "",
"--io.rsocket.routing.client.brokers[0].host=localhost",
"--io.rsocket.routing.client.brokers[0].port=" + routingBrokerProxyPort,
"--io.rsocket.routing.broker.enabled=false");
}
@EnableAutoConfiguration
@Configuration
public static class SimpleConfiguration {
@@ -125,50 +135,5 @@ public class RoutingBrokerTests {
return v.toUpperCase();
};
}
@Bean
public Function<String, String> concat() {
return v -> {
return v + v;
};
}
@Bean
public Function<String, String> echo() {
return v -> v;
}
@Bean
public Function<Flux<String>, Flux<String>> uppercaseReactive() {
return flux -> flux.map(v -> {
System.out.println("Uppercasing: " + v);
return v.toUpperCase();
});
}
@Bean
public Consumer<byte[]> log() {
return v -> {
System.out.println("==> In Consumer: " + new String(v));
};
}
}
@EnableAutoConfiguration
@Configuration
public static class AdditionalFunctionConfiguration {
@Bean
public Function<String, String> reverse() {
return v -> {
return new StringBuilder(v).reverse().toString();
};
}
@Bean
public Function<String, String> wrap() {
return v -> {
return "(" + v + ")";
};
}
}
}