diff --git a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RoutingBrokerTests.java b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RoutingBrokerTests.java index 81ca2272d..addcb9445 100644 --- a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RoutingBrokerTests.java +++ b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RoutingBrokerTests.java @@ -16,28 +16,10 @@ package org.springframework.cloud.function.rsocket; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.StandardCharsets; import java.util.function.Consumer; import java.util.function.Function; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.CompositeByteBuf; -import io.rsocket.Payload; -import io.rsocket.RSocket; -import io.rsocket.core.RSocketConnector; -import io.rsocket.frame.decoder.PayloadDecoder; -import io.rsocket.metadata.CompositeMetadataCodec; -import io.rsocket.routing.broker.spring.MimeTypes; -import io.rsocket.routing.common.Id; -import io.rsocket.routing.common.Tags; -import io.rsocket.routing.common.WellKnownKey; -import io.rsocket.routing.frames.AddressFlyweight; -import io.rsocket.routing.frames.RouteSetupFlyweight; -import io.rsocket.transport.netty.client.TcpClientTransport; -import io.rsocket.util.DefaultPayload; +import io.rsocket.routing.client.spring.SpringRouting; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -49,12 +31,9 @@ import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.messaging.rsocket.RSocketStrategies; +import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.util.SocketUtils; -import static io.netty.buffer.Unpooled.EMPTY_BUFFER; -import static io.rsocket.routing.broker.spring.MimeTypes.COMPOSITE_MIME_TYPE; - /** * @@ -68,12 +47,13 @@ public class RoutingBrokerTests { ConfigurableApplicationContext functionContext = null; ConfigurableApplicationContext brokerContext = null; + ConfigurableApplicationContext clientContext = null; try { int routingBrokerProxyPort = SocketUtils.findAvailableTcpPort(); int routingBrokerClusterPort = SocketUtils.findAvailableTcpPort(); // start broker - brokerContext = new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run( + brokerContext = new SpringApplicationBuilder(SimpleConfiguration.class).web(WebApplicationType.NONE).run( "--logging.level.io.rsocket.routing.broker=TRACE", "--spring.cloud.function.rsocket.enabled=false", "--io.rsocket.routing.client.enabled=false", @@ -81,37 +61,35 @@ public class RoutingBrokerTests { "--io.rsocket.routing.broker.tcp.port=" + routingBrokerProxyPort, "--io.rsocket.routing.broker.cluster.port=" + routingBrokerClusterPort); - // start function connecting to broker, service-name=toupper + // start function connecting to broker, service-name=samplefn functionContext = new SpringApplicationBuilder(SampleFunctionConfiguration.class) .web(WebApplicationType.NONE).run( "--logging.level.org.springframework.cloud.function=DEBUG", "--io.rsocket.routing.client.enabled=true", - "--io.rsocket.routing.client.service-name=toupper", + "--io.rsocket.routing.client.service-name=samplefn", "--io.rsocket.routing.client.brokers[0].host=localhost", "--io.rsocket.routing.client.brokers[0].port=" + routingBrokerProxyPort, "--io.rsocket.routing.broker.enabled=false", "--spring.cloud.function.definition=uppercase"); - // setup metadata to identify the this test connecting to broker. - RSocketStrategies strategies = functionContext.getBean(RSocketStrategies.class); - Id testerId = Id.random(); - ByteBuf routeSetup = encodeRouteSetup(strategies, testerId, "tester"); - Payload setupPayload = DefaultPayload.create(EMPTY_BUFFER, routeSetup); + // start testclient connecting to broker, for RSocketRequester + clientContext = new SpringApplicationBuilder(SimpleConfiguration.class) + .web(WebApplicationType.NONE).run( + "--logging.level.io.rsocket.routing.client=TRACE", + "--spring.cloud.function.rsocket.enabled=false", + "--io.rsocket.routing.client.enabled=true", + "--io.rsocket.routing.client.service-name=testclient", + "--io.rsocket.routing.client.brokers[0].host=localhost", + "--io.rsocket.routing.client.brokers[0].port=" + routingBrokerProxyPort, + "--io.rsocket.routing.broker.enabled=false"); - // connect to broker - RSocket socket = RSocketConnector.create().payloadDecoder(PayloadDecoder.ZERO_COPY) - .metadataMimeType(COMPOSITE_MIME_TYPE.toString()) - .setupPayload(setupPayload) - .connect(TcpClientTransport.create(routingBrokerProxyPort)) - .block(); - - // setup data for request to toupper service - ByteBuffer data = StandardCharsets.UTF_8.encode(CharBuffer.wrap("\"hello\"")); - // setup metadata for request to toupper service - ByteBuf routingMetadata = encodeAddress(strategies, testerId, "toupper"); - Payload payload = DefaultPayload.create(data, routingMetadata.nioBuffer()); - // call toupper service - Mono result = socket.requestResponse(payload).map(Payload::getDataUtf8); + RSocketRequester requester = clientContext.getBean(RSocketRequester.class); + SpringRouting routing = clientContext.getBean(SpringRouting.class); + Mono result = requester.route("toupper") // used to find a messagemapping, so unused here + // auto creates metadata + .metadata(routing.address("samplefn")) + .data("\"hello\"") + .retrieveMono(String.class); StepVerifier .create(result) @@ -125,44 +103,15 @@ public class RoutingBrokerTests { if (brokerContext != null) { brokerContext.close(); } + if (clientContext != null) { + clientContext.close(); + } } } - static ByteBuf encodeAddress(RSocketStrategies strategies, Id originRouteId, String serviceName) { - Tags tags = Tags.builder().with(WellKnownKey.SERVICE_NAME, serviceName) - .buildTags(); - ByteBuf address = AddressFlyweight - .encode(ByteBufAllocator.DEFAULT, originRouteId, Tags.empty(), tags); - - CompositeByteBuf composite = encodeComposite(address, MimeTypes.ROUTING_FRAME_MIME_TYPE - .toString()); - return composite; - } - - private static ByteBuf encodeRouteSetup(RSocketStrategies strategies, Id routeId, String serviceName) { - Tags tags = Tags.builder() - .with("current-time", String.valueOf(System.currentTimeMillis())) - .with(WellKnownKey.TIME_ZONE, System.currentTimeMillis() + "") - .buildTags(); - ByteBuf routeSetup = RouteSetupFlyweight - .encode(ByteBufAllocator.DEFAULT, routeId, serviceName, tags); - - CompositeByteBuf composite = encodeComposite(routeSetup, MimeTypes.ROUTING_FRAME_MIME_TYPE - .toString()); - return composite; - } - - private static CompositeByteBuf encodeComposite(ByteBuf byteBuf, String mimeType) { - CompositeByteBuf composite = ByteBufAllocator.DEFAULT.compositeBuffer(); - CompositeMetadataCodec - .encodeAndAddMetadata(composite, ByteBufAllocator.DEFAULT, - mimeType, byteBuf); - return composite; - } - @EnableAutoConfiguration @Configuration - public static class RoutingBrokerConfiguration { + public static class SimpleConfiguration { }