Commit 1996952e authored by Brian Clozel's avatar Brian Clozel

Update RSocket configuration after Framework changes

Since spring-projects/spring-framework#23314, the `RSocketStrategies`
provide more codecs by default, and there is no need to order them to
avoid conflicts during mime type selection.

This commit also ensures that the `PayloadDecoder.ZERO_COPY` is
configured on the RSocket server if the configured `DataBufferFactory`
is compatible with that strategy.
parent 5a4ec081
......@@ -19,6 +19,7 @@ package org.springframework.boot.autoconfigure.rsocket;
import java.util.stream.Collectors;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.server.TcpServerTransport;
import reactor.netty.http.server.HttpServer;
......@@ -40,6 +41,7 @@ import org.springframework.boot.rsocket.server.ServerRSocketFactoryCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
......@@ -104,6 +106,18 @@ public class RSocketServerAutoConfiguration {
return new RSocketServerBootstrap(rSocketServerFactory, rSocketMessageHandler.serverResponder());
}
@Bean
ServerRSocketFactoryCustomizer frameDecoderServerFactoryCustomizer(
RSocketMessageHandler rSocketMessageHandler) {
return (serverRSocketFactory) -> {
if (rSocketMessageHandler.getRSocketStrategies()
.dataBufferFactory() instanceof NettyDataBufferFactory) {
return serverRSocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY);
}
return serverRSocketFactory;
};
}
}
static class OnRSocketWebServerCondition extends AllNestedConditions {
......
......@@ -31,11 +31,6 @@ import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.boot.rsocket.messaging.RSocketStrategiesCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.annotation.Order;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.MediaType;
import org.springframework.http.codec.cbor.Jackson2CborDecoder;
import org.springframework.http.codec.cbor.Jackson2CborEncoder;
......@@ -59,11 +54,7 @@ public class RSocketStrategiesAutoConfiguration {
@ConditionalOnMissingBean
public RSocketStrategies rSocketStrategies(ObjectProvider<RSocketStrategiesCustomizer> customizers) {
RSocketStrategies.Builder builder = RSocketStrategies.builder();
builder.reactiveAdapterStrategy(ReactiveAdapterRegistry.getSharedInstance());
customizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
builder.decoder(StringDecoder.allMimeTypes());
builder.encoder(CharSequenceEncoder.allMimeTypes());
builder.dataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT));
return builder.build();
}
......@@ -74,7 +65,6 @@ public class RSocketStrategiesAutoConfiguration {
private static final MediaType[] SUPPORTED_TYPES = { MediaType.APPLICATION_CBOR };
@Bean
@Order(0)
@ConditionalOnBean(Jackson2ObjectMapperBuilder.class)
public RSocketStrategiesCustomizer jacksonCborRSocketStrategyCustomizer(Jackson2ObjectMapperBuilder builder) {
return (strategy) -> {
......@@ -94,7 +84,6 @@ public class RSocketStrategiesAutoConfiguration {
new MediaType("application", "*+json") };
@Bean
@Order(1)
@ConditionalOnBean(ObjectMapper.class)
public RSocketStrategiesCustomizer jacksonJsonRSocketStrategyCustomizer(ObjectMapper objectMapper) {
return (strategy) -> {
......
......@@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.rsocket.server.RSocketServerBootstrap;
import org.springframework.boot.rsocket.server.RSocketServerFactory;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryCustomizer;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.boot.test.context.runner.ReactiveWebApplicationContextRunner;
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
......@@ -75,7 +76,8 @@ class RSocketServerAutoConfigurationTests {
void shouldCreateDefaultBeansForRSocketServerWhenPortIsSet() {
reactiveWebContextRunner().withPropertyValues("spring.rsocket.server.port=0")
.run((context) -> assertThat(context).hasSingleBean(RSocketServerFactory.class)
.hasSingleBean(RSocketServerBootstrap.class));
.hasSingleBean(RSocketServerBootstrap.class)
.hasSingleBean(ServerRSocketFactoryCustomizer.class));
}
@Test
......
......@@ -24,9 +24,9 @@ import org.springframework.boot.rsocket.messaging.RSocketStrategiesCustomizer;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.codec.ByteArrayDecoder;
import org.springframework.core.codec.ByteArrayEncoder;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.http.codec.cbor.Jackson2CborDecoder;
import org.springframework.http.codec.cbor.Jackson2CborEncoder;
......@@ -35,6 +35,7 @@ import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.messaging.rsocket.RSocketStrategies;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
/**
* Tests for {@link RSocketStrategiesAutoConfiguration}
......@@ -51,14 +52,10 @@ class RSocketStrategiesAutoConfigurationTests {
this.contextRunner.run((context) -> {
assertThat(context).getBeans(RSocketStrategies.class).hasSize(1);
RSocketStrategies strategies = context.getBean(RSocketStrategies.class);
assertThat(strategies.decoders()).hasSize(3);
assertThat(strategies.decoders().get(0)).isInstanceOf(Jackson2CborDecoder.class);
assertThat(strategies.decoders().get(1)).isInstanceOf(Jackson2JsonDecoder.class);
assertThat(strategies.decoders().get(2)).isInstanceOf(StringDecoder.class);
assertThat(strategies.encoders()).hasSize(3);
assertThat(strategies.encoders().get(0)).isInstanceOf(Jackson2CborEncoder.class);
assertThat(strategies.encoders().get(1)).isInstanceOf(Jackson2JsonEncoder.class);
assertThat(strategies.encoders().get(2)).isInstanceOf(CharSequenceEncoder.class);
assertThat(strategies.decoders()).hasAtLeastOneElementOfType(Jackson2CborDecoder.class)
.hasAtLeastOneElementOfType(Jackson2JsonDecoder.class);
assertThat(strategies.encoders()).hasAtLeastOneElementOfType(Jackson2CborEncoder.class)
.hasAtLeastOneElementOfType(Jackson2JsonEncoder.class);
});
}
......@@ -75,8 +72,8 @@ class RSocketStrategiesAutoConfigurationTests {
this.contextRunner.withUserConfiguration(StrategiesCustomizer.class).run((context) -> {
assertThat(context).getBeans(RSocketStrategies.class).hasSize(1);
RSocketStrategies strategies = context.getBean(RSocketStrategies.class);
assertThat(strategies.decoders()).hasSize(4).hasAtLeastOneElementOfType(ByteArrayDecoder.class);
assertThat(strategies.encoders()).hasSize(4).hasAtLeastOneElementOfType(ByteArrayEncoder.class);
assertThat(strategies.decoders()).hasAtLeastOneElementOfType(CustomDecoder.class);
assertThat(strategies.encoders()).hasAtLeastOneElementOfType(CustomEncoder.class);
});
}
......@@ -96,9 +93,17 @@ class RSocketStrategiesAutoConfigurationTests {
@Bean
RSocketStrategiesCustomizer myCustomizer() {
return (strategies) -> strategies.encoder(new ByteArrayEncoder()).decoder(new ByteArrayDecoder());
return (strategies) -> strategies.encoder(mock(CustomEncoder.class)).decoder(mock(CustomDecoder.class));
}
}
interface CustomEncoder extends Encoder<String> {
}
interface CustomDecoder extends Decoder<String> {
}
}
......@@ -86,7 +86,8 @@ class RSocketWebSocketNettyRouteProviderTests {
private RSocketRequester createRSocketRequester(ApplicationContext context, WebServer server) {
int port = server.getPort();
RSocketRequester.Builder builder = context.getBean(RSocketRequester.Builder.class);
return builder.connectWebSocket(URI.create("ws://localhost:" + port + "/rsocket")).block();
return builder.dataMimeType(MediaType.APPLICATION_CBOR)
.connectWebSocket(URI.create("ws://localhost:" + port + "/rsocket")).block();
}
@Configuration(proxyBeanMethods = false)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment