Commit ebc9c575 authored by Stephane Nicoll's avatar Stephane Nicoll

Allow to customize RSocketServer's fragment size

Closes gh-23247
parent ec0fae88
......@@ -22,6 +22,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import org.springframework.boot.rsocket.server.RSocketServer;
import org.springframework.boot.web.server.Ssl;
import org.springframework.util.unit.DataSize;
/**
* {@link ConfigurationProperties properties} for RSocket support.
......@@ -62,6 +63,12 @@ public class RSocketProperties {
*/
private String mappingPath;
/**
* Maximum transmission unit. Frames larger than the specified value are
* fragmented.
*/
private DataSize fragmentSize;
@NestedConfigurationProperty
private Ssl ssl;
......@@ -97,6 +104,14 @@ public class RSocketProperties {
this.mappingPath = mappingPath;
}
public DataSize getFragmentSize() {
return this.fragmentSize;
}
public void setFragmentSize(DataSize fragmentSize) {
this.fragmentSize = fragmentSize;
}
public Ssl getSsl() {
return this.ssl;
}
......
......@@ -97,6 +97,7 @@ public class RSocketServerAutoConfiguration {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties.getServer().getAddress()).to(factory::setAddress);
map.from(properties.getServer().getPort()).to(factory::setPort);
map.from(properties.getServer().getFragmentSize()).to(factory::setFragmentSize);
map.from(properties.getServer().getSsl()).to(factory::setSsl);
factory.setRSocketServerCustomizers(customizers.orderedStream().collect(Collectors.toList()));
return factory;
......
......@@ -32,6 +32,7 @@ import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.util.unit.DataSize;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
......@@ -91,6 +92,28 @@ class RSocketServerAutoConfigurationTests {
});
}
@Test
void shouldSetFragmentWhenRSocketServerFragmentSizeIsSet() {
reactiveWebContextRunner()
.withPropertyValues("spring.rsocket.server.port=0", "spring.rsocket.server.fragment-size=12KB")
.run((context) -> {
assertThat(context).hasSingleBean(RSocketServerFactory.class);
RSocketServerFactory factory = context.getBean(RSocketServerFactory.class);
assertThat(factory).hasFieldOrPropertyWithValue("fragmentSize", DataSize.ofKilobytes(12));
});
}
@Test
void shouldFailToSetFragmentWhenRSocketServerFragmentSizeIsBelow64() {
reactiveWebContextRunner()
.withPropertyValues("spring.rsocket.server.port=0", "spring.rsocket.server.fragment-size=60B")
.run((context) -> {
assertThat(context).hasFailed();
assertThat(context.getStartupFailure())
.hasMessageContaining("The smallest allowed mtu size is 64 bytes, provided: 60");
});
}
@Test
void shouldUseSslWhenRocketServerSslIsConfigured() {
reactiveWebContextRunner()
......
......@@ -33,6 +33,7 @@ import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServer;
import reactor.netty.tcp.TcpServer;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.rsocket.server.ConfigurableRSocketServerFactory;
import org.springframework.boot.rsocket.server.RSocketServer;
import org.springframework.boot.rsocket.server.RSocketServerCustomizer;
......@@ -42,6 +43,7 @@ import org.springframework.boot.web.server.Ssl;
import org.springframework.boot.web.server.SslStoreProvider;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import org.springframework.util.Assert;
import org.springframework.util.unit.DataSize;
/**
* {@link RSocketServerFactory} that can be used to create {@link RSocketServer}s backed
......@@ -55,6 +57,8 @@ public class NettyRSocketServerFactory implements RSocketServerFactory, Configur
private int port = 9898;
private DataSize fragmentSize;
private InetAddress address;
private RSocketServer.Transport transport = RSocketServer.Transport.TCP;
......@@ -74,6 +78,11 @@ public class NettyRSocketServerFactory implements RSocketServerFactory, Configur
this.port = port;
}
@Override
public void setFragmentSize(DataSize fragmentSize) {
this.fragmentSize = fragmentSize;
}
@Override
public void setAddress(InetAddress address) {
this.address = address;
......@@ -138,11 +147,17 @@ public class NettyRSocketServerFactory implements RSocketServerFactory, Configur
public NettyRSocketServer create(SocketAcceptor socketAcceptor) {
ServerTransport<CloseableChannel> transport = createTransport();
io.rsocket.core.RSocketServer server = io.rsocket.core.RSocketServer.create(socketAcceptor);
this.rSocketServerCustomizers.forEach((customizer) -> customizer.customize(server));
configureServer(server);
Mono<CloseableChannel> starter = server.bind(transport);
return new NettyRSocketServer(starter, this.lifecycleTimeout);
}
private void configureServer(io.rsocket.core.RSocketServer server) {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this.fragmentSize).asInt(DataSize::toBytes).to(server::fragment);
this.rSocketServerCustomizers.forEach((customizer) -> customizer.customize(server));
}
private ServerTransport<CloseableChannel> createTransport() {
if (this.transport == RSocketServer.Transport.WEBSOCKET) {
return createWebSocketTransport();
......
......@@ -20,6 +20,7 @@ import java.net.InetAddress;
import org.springframework.boot.web.server.Ssl;
import org.springframework.boot.web.server.SslStoreProvider;
import org.springframework.util.unit.DataSize;
/**
* A configurable {@link RSocketServerFactory}.
......@@ -36,6 +37,13 @@ public interface ConfigurableRSocketServerFactory {
*/
void setPort(int port);
/**
* Specify the maximum transmission unit. Frames larger than the specified
* {@code fragmentSize} are fragmented.
* @param fragmentSize the fragment size
*/
void setFragmentSize(DataSize fragmentSize);
/**
* Set the specific network address that the server should bind to.
* @param address the address to set (defaults to {@code null})
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment