Commit a0d2721e authored by leoli's avatar leoli Committed by Brian Clozel

Use custom port for RSocket server over websocket

Prior to this change, the custom port configured for the RSocket
server would not be used if the server factory is using the
websocket transport and is provided with a custom
`ReactorResourceFactory`.

Fixes gh-18200
parent f06c2503
...@@ -128,8 +128,8 @@ public class NettyRSocketServerFactory implements RSocketServerFactory, Configur ...@@ -128,8 +128,8 @@ public class NettyRSocketServerFactory implements RSocketServerFactory, Configur
private ServerTransport<CloseableChannel> createTransport() { private ServerTransport<CloseableChannel> createTransport() {
if (this.transport == RSocketServer.TRANSPORT.WEBSOCKET) { if (this.transport == RSocketServer.TRANSPORT.WEBSOCKET) {
if (this.resourceFactory != null) { if (this.resourceFactory != null) {
HttpServer httpServer = HttpServer.create() HttpServer httpServer = HttpServer.create().tcpConfiguration((tcpServer) -> tcpServer
.tcpConfiguration((tcpServer) -> tcpServer.runOn(this.resourceFactory.getLoopResources())); .runOn(this.resourceFactory.getLoopResources()).addressSupplier(this::getListenAddress));
return WebsocketServerTransport.create(httpServer); return WebsocketServerTransport.create(httpServer);
} }
else { else {
......
...@@ -40,6 +40,7 @@ import org.springframework.boot.rsocket.server.ServerRSocketFactoryCustomizer; ...@@ -40,6 +40,7 @@ import org.springframework.boot.rsocket.server.ServerRSocketFactoryCustomizer;
import org.springframework.core.codec.CharSequenceEncoder; import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder; import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies; import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.SocketUtils; import org.springframework.util.SocketUtils;
...@@ -54,6 +55,7 @@ import static org.mockito.Mockito.mock; ...@@ -54,6 +55,7 @@ import static org.mockito.Mockito.mock;
* Tests for {@link NettyRSocketServerFactory} * Tests for {@link NettyRSocketServerFactory}
* *
* @author Brian Clozel * @author Brian Clozel
* @author Leo Li
*/ */
class NettyRSocketServerFactoryTests { class NettyRSocketServerFactoryTests {
...@@ -93,6 +95,7 @@ class NettyRSocketServerFactoryTests { ...@@ -93,6 +95,7 @@ class NettyRSocketServerFactoryTests {
String payload = "test payload"; String payload = "test payload";
String response = this.requester.route("test").data(payload).retrieveMono(String.class).block(TIMEOUT); String response = this.requester.route("test").data(payload).retrieveMono(String.class).block(TIMEOUT);
assertThat(this.rSocketServer.address().getPort()).isEqualTo(specificPort);
assertThat(response).isEqualTo(payload); assertThat(response).isEqualTo(payload);
assertThat(this.rSocketServer.address().getPort()).isEqualTo(specificPort); assertThat(this.rSocketServer.address().getPort()).isEqualTo(specificPort);
} }
...@@ -106,9 +109,29 @@ class NettyRSocketServerFactoryTests { ...@@ -106,9 +109,29 @@ class NettyRSocketServerFactoryTests {
this.requester = createRSocketWebSocketClient(); this.requester = createRSocketWebSocketClient();
String payload = "test payload"; String payload = "test payload";
String response = this.requester.route("test").data(payload).retrieveMono(String.class).block(TIMEOUT); String response = this.requester.route("test").data(payload).retrieveMono(String.class).block(TIMEOUT);
assertThat(response).isEqualTo(payload); assertThat(response).isEqualTo(payload);
} }
@Test
void websocketTransportWithReactorResource() {
NettyRSocketServerFactory factory = getFactory();
factory.setTransport(RSocketServer.TRANSPORT.WEBSOCKET);
ReactorResourceFactory resourceFactory = new ReactorResourceFactory();
resourceFactory.afterPropertiesSet();
factory.setResourceFactory(resourceFactory);
int specificPort = SocketUtils.findAvailableTcpPort(41000);
factory.setPort(specificPort);
this.rSocketServer = factory.create(new EchoRequestResponseAcceptor());
this.rSocketServer.start();
this.requester = createRSocketWebSocketClient();
String payload = "test payload";
String response = this.requester.route("test").data(payload).retrieveMono(String.class).block(TIMEOUT);
assertThat(response).isEqualTo(payload);
assertThat(this.rSocketServer.address().getPort()).isEqualTo(specificPort);
}
@Test @Test
void serverCustomizers() { void serverCustomizers() {
NettyRSocketServerFactory factory = getFactory(); NettyRSocketServerFactory factory = getFactory();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment