Migrates to using rsocket-routing-broker
This commit is contained in:
@@ -16,28 +16,10 @@
|
||||
|
||||
package org.springframework.cloud.function.rsocket;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.CharBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.rsocket.Payload;
|
||||
import io.rsocket.RSocket;
|
||||
import io.rsocket.core.RSocketConnector;
|
||||
import io.rsocket.frame.decoder.PayloadDecoder;
|
||||
import io.rsocket.metadata.CompositeMetadataCodec;
|
||||
import io.rsocket.routing.broker.spring.MimeTypes;
|
||||
import io.rsocket.routing.common.Id;
|
||||
import io.rsocket.routing.common.Tags;
|
||||
import io.rsocket.routing.common.WellKnownKey;
|
||||
import io.rsocket.routing.frames.AddressFlyweight;
|
||||
import io.rsocket.routing.frames.RouteSetupFlyweight;
|
||||
import io.rsocket.transport.netty.client.TcpClientTransport;
|
||||
import io.rsocket.util.DefaultPayload;
|
||||
import io.rsocket.routing.client.spring.SpringRouting;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
@@ -49,12 +31,9 @@ import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.messaging.rsocket.RSocketStrategies;
|
||||
import org.springframework.messaging.rsocket.RSocketRequester;
|
||||
import org.springframework.util.SocketUtils;
|
||||
|
||||
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
|
||||
import static io.rsocket.routing.broker.spring.MimeTypes.COMPOSITE_MIME_TYPE;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
@@ -68,12 +47,13 @@ public class RoutingBrokerTests {
|
||||
|
||||
ConfigurableApplicationContext functionContext = null;
|
||||
ConfigurableApplicationContext brokerContext = null;
|
||||
ConfigurableApplicationContext clientContext = null;
|
||||
try {
|
||||
int routingBrokerProxyPort = SocketUtils.findAvailableTcpPort();
|
||||
int routingBrokerClusterPort = SocketUtils.findAvailableTcpPort();
|
||||
|
||||
// start broker
|
||||
brokerContext = new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
brokerContext = new SpringApplicationBuilder(SimpleConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--logging.level.io.rsocket.routing.broker=TRACE",
|
||||
"--spring.cloud.function.rsocket.enabled=false",
|
||||
"--io.rsocket.routing.client.enabled=false",
|
||||
@@ -81,37 +61,35 @@ public class RoutingBrokerTests {
|
||||
"--io.rsocket.routing.broker.tcp.port=" + routingBrokerProxyPort,
|
||||
"--io.rsocket.routing.broker.cluster.port=" + routingBrokerClusterPort);
|
||||
|
||||
// start function connecting to broker, service-name=toupper
|
||||
// start function connecting to broker, service-name=samplefn
|
||||
functionContext = new SpringApplicationBuilder(SampleFunctionConfiguration.class)
|
||||
.web(WebApplicationType.NONE).run(
|
||||
"--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
"--io.rsocket.routing.client.enabled=true",
|
||||
"--io.rsocket.routing.client.service-name=toupper",
|
||||
"--io.rsocket.routing.client.service-name=samplefn",
|
||||
"--io.rsocket.routing.client.brokers[0].host=localhost",
|
||||
"--io.rsocket.routing.client.brokers[0].port=" + routingBrokerProxyPort,
|
||||
"--io.rsocket.routing.broker.enabled=false",
|
||||
"--spring.cloud.function.definition=uppercase");
|
||||
|
||||
// setup metadata to identify the this test connecting to broker.
|
||||
RSocketStrategies strategies = functionContext.getBean(RSocketStrategies.class);
|
||||
Id testerId = Id.random();
|
||||
ByteBuf routeSetup = encodeRouteSetup(strategies, testerId, "tester");
|
||||
Payload setupPayload = DefaultPayload.create(EMPTY_BUFFER, routeSetup);
|
||||
// start testclient connecting to broker, for RSocketRequester
|
||||
clientContext = new SpringApplicationBuilder(SimpleConfiguration.class)
|
||||
.web(WebApplicationType.NONE).run(
|
||||
"--logging.level.io.rsocket.routing.client=TRACE",
|
||||
"--spring.cloud.function.rsocket.enabled=false",
|
||||
"--io.rsocket.routing.client.enabled=true",
|
||||
"--io.rsocket.routing.client.service-name=testclient",
|
||||
"--io.rsocket.routing.client.brokers[0].host=localhost",
|
||||
"--io.rsocket.routing.client.brokers[0].port=" + routingBrokerProxyPort,
|
||||
"--io.rsocket.routing.broker.enabled=false");
|
||||
|
||||
// connect to broker
|
||||
RSocket socket = RSocketConnector.create().payloadDecoder(PayloadDecoder.ZERO_COPY)
|
||||
.metadataMimeType(COMPOSITE_MIME_TYPE.toString())
|
||||
.setupPayload(setupPayload)
|
||||
.connect(TcpClientTransport.create(routingBrokerProxyPort))
|
||||
.block();
|
||||
|
||||
// setup data for request to toupper service
|
||||
ByteBuffer data = StandardCharsets.UTF_8.encode(CharBuffer.wrap("\"hello\""));
|
||||
// setup metadata for request to toupper service
|
||||
ByteBuf routingMetadata = encodeAddress(strategies, testerId, "toupper");
|
||||
Payload payload = DefaultPayload.create(data, routingMetadata.nioBuffer());
|
||||
// call toupper service
|
||||
Mono<String> result = socket.requestResponse(payload).map(Payload::getDataUtf8);
|
||||
RSocketRequester requester = clientContext.getBean(RSocketRequester.class);
|
||||
SpringRouting routing = clientContext.getBean(SpringRouting.class);
|
||||
Mono<String> result = requester.route("toupper") // used to find a messagemapping, so unused here
|
||||
// auto creates metadata
|
||||
.metadata(routing.address("samplefn"))
|
||||
.data("\"hello\"")
|
||||
.retrieveMono(String.class);
|
||||
|
||||
StepVerifier
|
||||
.create(result)
|
||||
@@ -125,44 +103,15 @@ public class RoutingBrokerTests {
|
||||
if (brokerContext != null) {
|
||||
brokerContext.close();
|
||||
}
|
||||
if (clientContext != null) {
|
||||
clientContext.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static ByteBuf encodeAddress(RSocketStrategies strategies, Id originRouteId, String serviceName) {
|
||||
Tags tags = Tags.builder().with(WellKnownKey.SERVICE_NAME, serviceName)
|
||||
.buildTags();
|
||||
ByteBuf address = AddressFlyweight
|
||||
.encode(ByteBufAllocator.DEFAULT, originRouteId, Tags.empty(), tags);
|
||||
|
||||
CompositeByteBuf composite = encodeComposite(address, MimeTypes.ROUTING_FRAME_MIME_TYPE
|
||||
.toString());
|
||||
return composite;
|
||||
}
|
||||
|
||||
private static ByteBuf encodeRouteSetup(RSocketStrategies strategies, Id routeId, String serviceName) {
|
||||
Tags tags = Tags.builder()
|
||||
.with("current-time", String.valueOf(System.currentTimeMillis()))
|
||||
.with(WellKnownKey.TIME_ZONE, System.currentTimeMillis() + "")
|
||||
.buildTags();
|
||||
ByteBuf routeSetup = RouteSetupFlyweight
|
||||
.encode(ByteBufAllocator.DEFAULT, routeId, serviceName, tags);
|
||||
|
||||
CompositeByteBuf composite = encodeComposite(routeSetup, MimeTypes.ROUTING_FRAME_MIME_TYPE
|
||||
.toString());
|
||||
return composite;
|
||||
}
|
||||
|
||||
private static CompositeByteBuf encodeComposite(ByteBuf byteBuf, String mimeType) {
|
||||
CompositeByteBuf composite = ByteBufAllocator.DEFAULT.compositeBuffer();
|
||||
CompositeMetadataCodec
|
||||
.encodeAndAddMetadata(composite, ByteBufAllocator.DEFAULT,
|
||||
mimeType, byteBuf);
|
||||
return composite;
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
public static class RoutingBrokerConfiguration {
|
||||
public static class SimpleConfiguration {
|
||||
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user