committed by
Oleg Zhurakousky
parent
6d872a20ee
commit
182e45ee00
@@ -16,7 +16,7 @@
|
||||
|
||||
package org.springframework.cloud.function.rsocket;
|
||||
|
||||
import io.rsocket.routing.client.spring.RoutingClientAutoConfiguration;
|
||||
import io.rsocket.broker.client.spring.BrokerClientAutoConfiguration;
|
||||
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
|
||||
@@ -35,9 +35,9 @@ import org.springframework.messaging.rsocket.RSocketConnectorConfigurer;
|
||||
* @since 3.1
|
||||
*/
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
@ConditionalOnClass(RoutingClientAutoConfiguration.class)
|
||||
@ConditionalOnClass(BrokerClientAutoConfiguration.class)
|
||||
@ConditionalOnProperty(name = FunctionProperties.PREFIX + ".rsocket.enabled", matchIfMissing = true)
|
||||
@AutoConfigureBefore(RoutingClientAutoConfiguration.class)
|
||||
@AutoConfigureBefore(BrokerClientAutoConfiguration.class)
|
||||
@AutoConfigureAfter(RSocketAutoConfiguration.class)
|
||||
class RSocketRoutingAutoConfiguration {
|
||||
|
||||
|
||||
@@ -16,9 +16,10 @@
|
||||
|
||||
package org.springframework.cloud.function.rsocket;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.function.Function;
|
||||
|
||||
import io.rsocket.routing.client.spring.RoutingMetadata;
|
||||
import io.rsocket.broker.client.spring.BrokerMetadata;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -64,7 +65,7 @@ public class RoutingBrokerTests {
|
||||
public void testRoutingWithProperty() throws Exception {
|
||||
this.setup(true);
|
||||
RSocketRequester requester = clientContext.getBean(RSocketRequester.class);
|
||||
// route(uppercase) used to find function, must match io.rsocket.routing.client.address entry
|
||||
// route(uppercase) used to find function, must match io.rsocket.broker.client.address entry
|
||||
Mono<String> result = requester.route("uppercase")
|
||||
// auto creates metadata
|
||||
.data("\"hello\"")
|
||||
@@ -74,14 +75,14 @@ public class RoutingBrokerTests {
|
||||
.create(result)
|
||||
.expectNext("HELLO")
|
||||
.expectComplete()
|
||||
.verify();
|
||||
.verify(Duration.ofSeconds(15));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRoutingWithMessage() throws Exception {
|
||||
this.setup(false);
|
||||
RSocketRequester requester = clientContext.getBean(RSocketRequester.class);
|
||||
RoutingMetadata metadata = clientContext.getBean(RoutingMetadata.class);
|
||||
BrokerMetadata metadata = clientContext.getBean(BrokerMetadata.class);
|
||||
Mono<String> result = requester.route("uppercase") // used to find function
|
||||
.metadata(metadata.address("samplefn"))
|
||||
.data("\"hello\"")
|
||||
@@ -91,41 +92,40 @@ public class RoutingBrokerTests {
|
||||
.create(result)
|
||||
.expectNext("HELLO")
|
||||
.expectComplete()
|
||||
.verify();
|
||||
.verify(Duration.ofSeconds(15));
|
||||
}
|
||||
|
||||
private void setup(boolean routingWithProperty) {
|
||||
int routingBrokerProxyPort = SocketUtils.findAvailableTcpPort();
|
||||
int routingBrokerClusterPort = SocketUtils.findAvailableTcpPort();
|
||||
int brokerProxyPort = SocketUtils.findAvailableTcpPort();
|
||||
int brokerClusterPort = SocketUtils.findAvailableTcpPort();
|
||||
// start broker
|
||||
brokerContext = new SpringApplicationBuilder(SimpleConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--logging.level.io.rsocket.routing.broker=TRACE",
|
||||
"--logging.level.io.rsocket.broker=TRACE",
|
||||
"--spring.cloud.function.rsocket.enabled=false",
|
||||
"--io.rsocket.routing.client.enabled=false",
|
||||
"--io.rsocket.routing.broker.enabled=true",
|
||||
"--io.rsocket.routing.broker.tcp.port=" + routingBrokerProxyPort,
|
||||
"--io.rsocket.routing.broker.cluster.port=" + routingBrokerClusterPort);
|
||||
"--io.rsocket.broker.client.enabled=false",
|
||||
"--io.rsocket.broker.enabled=true",
|
||||
"--io.rsocket.broker.uri=tcp://localhost:" + brokerProxyPort,
|
||||
"--io.rsocket.broker.cluster.uri=tcp://localhost:" + brokerClusterPort);
|
||||
|
||||
// start function connecting to broker, service-name=samplefn
|
||||
functionContext = new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE)
|
||||
.run("--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
"--io.rsocket.routing.client.enabled=true",
|
||||
"--io.rsocket.routing.client.service-name=samplefn",
|
||||
"--io.rsocket.routing.client.brokers[0].tcp.host=localhost",
|
||||
"--io.rsocket.routing.client.brokers[0].tcp.port=" + routingBrokerProxyPort,
|
||||
"--io.rsocket.routing.broker.enabled=false",
|
||||
"--logging.level.io.rsocket.broker.client=TRACE",
|
||||
"--io.rsocket.broker.client.enabled=true",
|
||||
"--io.rsocket.broker.client.service-name=samplefn",
|
||||
"--io.rsocket.broker.client.brokers[0]=tcp://localhost:" + brokerProxyPort,
|
||||
"--io.rsocket.broker.enabled=false",
|
||||
"--spring.cloud.function.definition=uppercase");
|
||||
|
||||
// start testclient connecting to broker, for RSocketRequester
|
||||
clientContext = new SpringApplicationBuilder(SimpleConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--logging.level.io.rsocket.routing.client=TRACE",
|
||||
"--logging.level.io.rsocket.broker.client=TRACE",
|
||||
"--spring.cloud.function.rsocket.enabled=false",
|
||||
"--io.rsocket.routing.client.enabled=true",
|
||||
"--io.rsocket.routing.client.service-name=testclient",
|
||||
routingWithProperty ? "--io.rsocket.routing.client.address.uppercase.service_name=samplefn" : "",
|
||||
"--io.rsocket.routing.client.brokers[0].tcp.host=localhost",
|
||||
"--io.rsocket.routing.client.brokers[0].tcp.port=" + routingBrokerProxyPort,
|
||||
"--io.rsocket.routing.broker.enabled=false");
|
||||
"--io.rsocket.broker.client.enabled=true",
|
||||
"--io.rsocket.broker.client.service-name=testclient",
|
||||
routingWithProperty ? "--io.rsocket.broker.client.address.uppercase.service_name=samplefn" : "",
|
||||
"--io.rsocket.broker.client.brokers[0]=tcp://localhost:" + brokerProxyPort,
|
||||
"--io.rsocket.broker.enabled=false");
|
||||
}
|
||||
|
||||
|
||||
@@ -140,9 +140,7 @@ public class RoutingBrokerTests {
|
||||
public static class SampleFunctionConfiguration {
|
||||
@Bean
|
||||
public Function<String, String> uppercase() {
|
||||
return v -> {
|
||||
return v.toUpperCase();
|
||||
};
|
||||
return v -> v.toUpperCase();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
io.rsocket.routing.broker.enabled=false
|
||||
io.rsocket.routing.client.enabled=false
|
||||
io.rsocket.broker.enabled=false
|
||||
io.rsocket.broker.client.enabled=false
|
||||
|
||||
Reference in New Issue
Block a user