Commit b33944b5 authored by Brian Clozel's avatar Brian Clozel

Add RSocket server support with Spring Messaging

This commit adds support for RSocket server applications.
The auto-configuration will either add RSocket support to an existing
Reactor Netty server in a WebFlux application (as a WebSocket endpoint),
or bootstrap a brand new RSocket server instance.

Spring Boot will also auto-configure the Spring Messaging infrastructure
that supports Controller beans with `@MessageMapping` annotated methods.

Fixes gh-16021
parent 5e58f4a8
......@@ -112,6 +112,21 @@
<artifactId>reactor-netty</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.searchbox</groupId>
<artifactId>jest</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>jakarta.json.bind</groupId>
<artifactId>jakarta.json.bind-api</artifactId>
......@@ -127,11 +142,6 @@
<artifactId>money-api</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.searchbox</groupId>
<artifactId>jest</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
......
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.rsocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.server.TcpServerTransport;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.MessageHandlerAcceptor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
/**
* {@link EnableAutoConfiguration} for Spring RSocket support in Spring Messaging.
*
* @author Brian Clozel
* @since 2.2.0
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ RSocketRequester.class, RSocketFactory.class,
TcpServerTransport.class })
@AutoConfigureAfter(RSocketStrategiesAutoConfiguration.class)
public class RSocketMessagingAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public MessageHandlerAcceptor messageHandlerAcceptor(
RSocketStrategies rSocketStrategies) {
MessageHandlerAcceptor acceptor = new MessageHandlerAcceptor();
acceptor.setRSocketStrategies(rSocketStrategies);
return acceptor;
}
}
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.rsocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.netty.server.WebsocketRouteTransport;
import reactor.netty.http.server.HttpServer;
import org.springframework.boot.web.embedded.netty.NettyServerCustomizer;
import org.springframework.messaging.rsocket.MessageHandlerAcceptor;
/**
* {@libnk NettyServerCustomizer} that configures an RSocket Websocket endpoint.
*
* @author Brian Clozel
*/
class RSocketNettyServerCustomizer implements NettyServerCustomizer {
private final String mappingPath;
private final MessageHandlerAcceptor messageHandlerAcceptor;
RSocketNettyServerCustomizer(String mappingPath,
MessageHandlerAcceptor messageHandlerAcceptor) {
this.mappingPath = mappingPath;
this.messageHandlerAcceptor = messageHandlerAcceptor;
}
@Override
public HttpServer apply(HttpServer httpServer) {
final ServerTransport.ConnectionAcceptor acceptor = RSocketFactory.receive()
.acceptor(this.messageHandlerAcceptor).toConnectionAcceptor();
return httpServer.route((routes) -> routes.ws(this.mappingPath,
WebsocketRouteTransport.newHandler(acceptor)));
}
}
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.rsocket;
import java.net.InetAddress;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* {@link ConfigurationProperties properties} for RSocket support.
*
* @author Brian Clozel
* @since 2.2.0
*/
@ConfigurationProperties("spring.rsocket")
public class RSocketProperties {
private Server server = new Server();
public Server getServer() {
return this.server;
}
static class Server {
/**
* Server port.
*/
private Integer port;
/**
* Network address to which the server should bind.
*/
private InetAddress address;
/**
* RSocket transport protocol.
*/
private Transport transport = Transport.TCP;
/**
* Path under which RSocket handles requests (only works with websocket
* transport).
*/
private String mappingPath;
public Integer getPort() {
return this.port;
}
public void setPort(Integer port) {
this.port = port;
}
public InetAddress getAddress() {
return this.address;
}
public void setAddress(InetAddress address) {
this.address = address;
}
public Transport getTransport() {
return this.transport;
}
public void setTransport(Transport transport) {
this.transport = transport;
}
public String getMappingPath() {
return this.mappingPath;
}
public void setMappingPath(String mappingPath) {
this.mappingPath = mappingPath;
}
public enum Transport {
TCP, WEBSOCKET
}
}
}
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.rsocket;
import java.util.stream.Collectors;
import io.netty.buffer.PooledByteBufAllocator;
import io.rsocket.RSocketFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.AllNestedConditions;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.rsocket.netty.NettyRSocketBootstrap;
import org.springframework.boot.rsocket.netty.NettyRSocketServerFactory;
import org.springframework.boot.rsocket.server.RSocketServerFactory;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryCustomizer;
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import org.springframework.messaging.rsocket.MessageHandlerAcceptor;
import org.springframework.messaging.rsocket.RSocketStrategies;
/**
* {@link EnableAutoConfiguration Auto-configuration} for RSocket servers. In the case of
* {@link org.springframework.boot.WebApplicationType#REACTIVE}, the RSocket server is
* added as a WebSocket endpoint on the existing
* {@link org.springframework.boot.web.embedded.netty.NettyWebServer}. If a specific
* server port is configured, a new standalone RSocket server is created.
*
* @author Brian Clozel
* @since 2.2.0
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ RSocketFactory.class, RSocketStrategies.class,
PooledByteBufAllocator.class })
@ConditionalOnBean(MessageHandlerAcceptor.class)
@AutoConfigureAfter(RSocketStrategiesAutoConfiguration.class)
@EnableConfigurationProperties(RSocketProperties.class)
public class RSocketServerAutoConfiguration {
@Conditional(OnRSocketWebServerCondition.class)
@Configuration(proxyBeanMethods = false)
static class WebFluxServerAutoConfiguration {
@Bean
public WebServerFactoryCustomizer<NettyReactiveWebServerFactory> rSocketWebsocketCustomizer(
RSocketProperties properties,
MessageHandlerAcceptor messageHandlerAcceptor) {
RSocketNettyServerCustomizer customizer = new RSocketNettyServerCustomizer(
properties.getServer().getMappingPath(), messageHandlerAcceptor);
return (factory) -> factory.addServerCustomizers(customizer);
}
}
@ConditionalOnProperty(prefix = "spring.rsocket.server", name = "port")
@Configuration(proxyBeanMethods = false)
static class EmbeddedServerAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public ReactorResourceFactory reactorServerResourceFactory() {
return new ReactorResourceFactory();
}
@Bean
@ConditionalOnMissingBean
public RSocketServerFactory rSocketServerFactory(RSocketProperties properties,
ReactorResourceFactory resourceFactory,
ObjectProvider<ServerRSocketFactoryCustomizer> customizers) {
NettyRSocketServerFactory factory = new NettyRSocketServerFactory();
factory.setResourceFactory(resourceFactory);
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties.getServer().getAddress()).to(factory::setAddress);
map.from(properties.getServer().getPort()).to(factory::setPort);
factory.setServerCustomizers(
customizers.orderedStream().collect(Collectors.toList()));
return factory;
}
@Bean
public NettyRSocketBootstrap nettyRSocketBootstrap(
RSocketServerFactory rSocketServerFactory,
MessageHandlerAcceptor messageHandlerAcceptor) {
return new NettyRSocketBootstrap(rSocketServerFactory,
messageHandlerAcceptor);
}
}
static class OnRSocketWebServerCondition extends AllNestedConditions {
OnRSocketWebServerCondition() {
super(ConfigurationPhase.REGISTER_BEAN);
}
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
static class IsReactiveWebApplication {
}
@ConditionalOnProperty(prefix = "spring.rsocket.server", name = "port",
matchIfMissing = true)
static class HasNoPortConfigured {
}
@ConditionalOnProperty(prefix = "spring.rsocket.server", name = "mapping-path")
static class HasMappingPathConfigured {
}
@ConditionalOnProperty(prefix = "spring.rsocket.server", name = "transport",
havingValue = "websocket")
static class HasWebsocketTransportConfigured {
}
}
}
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.rsocket;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.PooledByteBufAllocator;
import io.rsocket.RSocketFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.boot.rsocket.messaging.RSocketStrategiesCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.MediaType;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.messaging.rsocket.RSocketStrategies;
/**
* {@link EnableAutoConfiguration} for {@link RSocketStrategies}.
*
* @author Brian Clozel
* @since 2.2.0
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ RSocketFactory.class, RSocketStrategies.class,
PooledByteBufAllocator.class })
@AutoConfigureAfter(JacksonAutoConfiguration.class)
public class RSocketStrategiesAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public RSocketStrategies rSocketStrategies(
ObjectProvider<RSocketStrategiesCustomizer> customizers) {
RSocketStrategies.Builder builder = RSocketStrategies.builder();
builder.reactiveAdapterStrategy(ReactiveAdapterRegistry.getSharedInstance());
customizers.stream().forEach((customizer) -> customizer.customize(builder));
builder.dataBufferFactory(
new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT));
return builder.build();
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(ObjectMapper.class)
protected static class JacksonStrategyConfiguration {
@Bean
@Order(0)
@ConditionalOnBean(ObjectMapper.class)
public RSocketStrategiesCustomizer jacksonStrategyCustomizer(
ObjectMapper objectMapper) {
return (strategy) -> {
MediaType[] supportedTypes = new MediaType[] { MediaType.APPLICATION_JSON,
new MediaType("application", "*+json") };
strategy.decoder(new Jackson2JsonDecoder(objectMapper, supportedTypes));
strategy.encoder(new Jackson2JsonEncoder(objectMapper, supportedTypes));
};
}
}
}
......@@ -99,6 +99,9 @@ org.springframework.boot.autoconfigure.mustache.MustacheAutoConfiguration,\
org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration,\
org.springframework.boot.autoconfigure.quartz.QuartzAutoConfiguration,\
org.springframework.boot.autoconfigure.reactor.core.ReactorCoreAutoConfiguration,\
org.springframework.boot.autoconfigure.rsocket.RSocketMessagingAutoConfiguration,\
org.springframework.boot.autoconfigure.rsocket.RSocketServerAutoConfiguration,\
org.springframework.boot.autoconfigure.rsocket.RSocketStrategiesAutoConfiguration,\
org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration,\
org.springframework.boot.autoconfigure.security.servlet.SecurityRequestMatcherProviderAutoConfiguration,\
org.springframework.boot.autoconfigure.security.servlet.UserDetailsServiceAutoConfiguration,\
......
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.rsocket;
import org.junit.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.messaging.rsocket.MessageHandlerAcceptor;
import org.springframework.messaging.rsocket.RSocketStrategies;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for {@link RSocketMessagingAutoConfiguration}.
*
* @author Brian Clozel
*/
public class RSocketMessagingAutoConfigurationTests {
private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(
AutoConfigurations.of(RSocketMessagingAutoConfiguration.class))
.withUserConfiguration(BaseConfiguration.class);
@Test
public void shouldCreateDefaultBeans() {
this.contextRunner.run((context) -> assertThat(context)
.getBeans(MessageHandlerAcceptor.class).hasSize(1));
}
@Test
public void shouldFailOnMissingStrategies() {
new ApplicationContextRunner()
.withConfiguration(
AutoConfigurations.of(RSocketMessagingAutoConfiguration.class))
.run((context) -> {
assertThat(context).hasFailed();
assertThat(context.getStartupFailure().getMessage())
.contains("No qualifying bean of type "
+ "'org.springframework.messaging.rsocket.RSocketStrategies' available");
});
}
@Test
public void shouldUseCustomMessageHandlerAcceptor() {
this.contextRunner.withUserConfiguration(CustomMessageHandlerAcceptor.class)
.run((context) -> assertThat(context)
.getBeanNames(MessageHandlerAcceptor.class)
.containsOnly("customMessageHandlerAcceptor"));
}
@Configuration
static class BaseConfiguration {
@Bean
public RSocketStrategies rSocketStrategies() {
return RSocketStrategies.builder()
.encoder(CharSequenceEncoder.textPlainOnly())
.decoder(StringDecoder.textPlainOnly()).build();
}
}
@Configuration
static class CustomMessageHandlerAcceptor {
@Bean
public MessageHandlerAcceptor customMessageHandlerAcceptor() {
MessageHandlerAcceptor acceptor = new MessageHandlerAcceptor();
RSocketStrategies strategies = RSocketStrategies.builder()
.encoder(CharSequenceEncoder.textPlainOnly())
.decoder(StringDecoder.textPlainOnly()).build();
acceptor.setRSocketStrategies(strategies);
return acceptor;
}
}
}
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.rsocket;
import org.junit.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.rsocket.netty.NettyRSocketBootstrap;
import org.springframework.boot.rsocket.server.RSocketServerFactory;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.boot.test.context.runner.ReactiveWebApplicationContextRunner;
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.messaging.rsocket.MessageHandlerAcceptor;
import org.springframework.messaging.rsocket.RSocketStrategies;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for {@link RSocketServerAutoConfiguration}.
*
* @author Brian Clozel
*/
public class RSocketServerAutoConfigurationTests {
@Test
public void shouldNotCreateBeansByDefault() {
ApplicationContextRunner contextRunner = createContextRunner();
contextRunner.run((context) -> assertThat(context)
.doesNotHaveBean(WebServerFactoryCustomizer.class)
.doesNotHaveBean(RSocketServerFactory.class)
.doesNotHaveBean(NettyRSocketBootstrap.class));
}
@Test
public void shouldNotCreateDefaultBeansForReactiveWebAppWithoutMapping() {
ReactiveWebApplicationContextRunner contextRunner = createReactiveWebContextRunner();
contextRunner.run((context) -> assertThat(context)
.doesNotHaveBean(WebServerFactoryCustomizer.class)
.doesNotHaveBean(RSocketServerFactory.class)
.doesNotHaveBean(NettyRSocketBootstrap.class));
}
@Test
public void shouldNotCreateDefaultBeansForReactiveWebAppWithWrongTransport() {
ReactiveWebApplicationContextRunner contextRunner = createReactiveWebContextRunner();
contextRunner
.withPropertyValues("spring.rsocket.server.transport=tcp",
"spring.rsocket.server.mapping-path=/rsocket")
.run((context) -> assertThat(context)
.doesNotHaveBean(WebServerFactoryCustomizer.class)
.doesNotHaveBean(RSocketServerFactory.class)
.doesNotHaveBean(NettyRSocketBootstrap.class));
}
@Test
public void shouldCreateDefaultBeansForReactiveWebApp() {
ReactiveWebApplicationContextRunner contextRunner = createReactiveWebContextRunner();
contextRunner
.withPropertyValues("spring.rsocket.server.transport=websocket",
"spring.rsocket.server.mapping-path=/rsocket")
.run((context) -> assertThat(context)
.getBeanNames(WebServerFactoryCustomizer.class).hasSize(1)
.containsOnly("rSocketWebsocketCustomizer"));
}
@Test
public void shouldCreateDefaultBeansForRSocketServerWhenPortIsSet() {
ReactiveWebApplicationContextRunner contextRunner = createReactiveWebContextRunner();
contextRunner.withPropertyValues("spring.rsocket.server.port=0")
.run((context) -> assertThat(context)
.hasSingleBean(RSocketServerFactory.class)
.hasSingleBean(NettyRSocketBootstrap.class));
}
private ApplicationContextRunner createContextRunner() {
return new ApplicationContextRunner()
.withUserConfiguration(BaseConfiguration.class).withConfiguration(
AutoConfigurations.of(RSocketServerAutoConfiguration.class));
}
private ReactiveWebApplicationContextRunner createReactiveWebContextRunner() {
return new ReactiveWebApplicationContextRunner()
.withUserConfiguration(BaseConfiguration.class).withConfiguration(
AutoConfigurations.of(RSocketServerAutoConfiguration.class));
}
@Configuration
static class BaseConfiguration {
@Bean
public MessageHandlerAcceptor messageHandlerAcceptor() {
MessageHandlerAcceptor messageHandlerAcceptor = new MessageHandlerAcceptor();
messageHandlerAcceptor.setRSocketStrategies(RSocketStrategies.builder()
.encoder(CharSequenceEncoder.textPlainOnly())
.decoder(StringDecoder.textPlainOnly()).build());
return messageHandlerAcceptor;
}
}
}
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.rsocket;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.rsocket.messaging.RSocketStrategiesCustomizer;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.messaging.rsocket.RSocketStrategies;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for {@link RSocketStrategiesAutoConfiguration}
*
* @author Brian Clozel
*/
public class RSocketStrategiesAutoConfigurationTests {
private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withUserConfiguration(BaseConfiguration.class).withConfiguration(
AutoConfigurations.of(RSocketStrategiesAutoConfiguration.class));
@Test
public void shouldCreateDefaultBeans() {
this.contextRunner.run((context) -> {
assertThat(context).getBeans(RSocketStrategies.class).hasSize(1);
RSocketStrategies strategies = context.getBean(RSocketStrategies.class);
assertThat(strategies.decoders()).hasSize(1)
.hasOnlyElementsOfType(Jackson2JsonDecoder.class);
assertThat(strategies.encoders()).hasSize(1)
.hasOnlyElementsOfType(Jackson2JsonEncoder.class);
});
}
@Test
public void shouldUseCustomStrategies() {
this.contextRunner.withUserConfiguration(UserStrategies.class).run((context) -> {
assertThat(context).getBeans(RSocketStrategies.class).hasSize(1);
assertThat(context.getBeanNamesForType(RSocketStrategies.class))
.contains("customRSocketStrategies");
});
}
@Test
public void shouldUseStrategiesCustomizer() {
this.contextRunner.withUserConfiguration(StrategiesCustomizer.class)
.run((context) -> {
assertThat(context).getBeans(RSocketStrategies.class).hasSize(1);
RSocketStrategies strategies = context
.getBean(RSocketStrategies.class);
assertThat(strategies.decoders()).hasSize(2)
.hasAtLeastOneElementOfType(StringDecoder.class);
assertThat(strategies.encoders()).hasSize(2)
.hasAtLeastOneElementOfType(CharSequenceEncoder.class);
});
}
@Configuration
static class BaseConfiguration {
@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper();
}
}
@Configuration
static class UserStrategies {
@Bean
public RSocketStrategies customRSocketStrategies() {
return RSocketStrategies.builder()
.encoder(CharSequenceEncoder.textPlainOnly())
.decoder(StringDecoder.textPlainOnly()).build();
}
}
@Configuration
static class StrategiesCustomizer {
@Bean
public RSocketStrategiesCustomizer myCustomizer() {
return (strategies) -> strategies.encoder(CharSequenceEncoder.textPlainOnly())
.decoder(StringDecoder.textPlainOnly());
}
}
}
......@@ -166,6 +166,7 @@
<reactor-bom.version>Californium-SR6</reactor-bom.version>
<rest-assured.version>3.3.0</rest-assured.version>
<reactive-streams.version>1.0.2</reactive-streams.version>
<rsocket.version>0.12.1-RC3</rsocket.version>
<rxjava.version>1.3.8</rxjava.version>
<rxjava-adapter.version>1.2.1</rxjava-adapter.version>
<rxjava2.version>2.2.8</rxjava2.version>
......@@ -509,6 +510,11 @@
<artifactId>spring-boot-starter-oauth2-resource-server</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-reactor-netty</artifactId>
......@@ -516,7 +522,7 @@
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
<artifactId>spring-boot-starter-rsocket</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
......@@ -1067,6 +1073,16 @@
<artifactId>xml-path</artifactId>
<version>${rest-assured.version}</version>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
<version>${rsocket.version}</version>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
<version>${rsocket.version}</version>
</dependency>
<dependency>
<groupId>io.searchbox</groupId>
<artifactId>jest</artifactId>
......
......@@ -63,6 +63,10 @@ include::{generated-resources-root}/config-docs/server.adoc[]
include::{generated-resources-root}/config-docs/security.adoc[]
=== RSocket properties
include::../../../target/generated-resources/config-docs/rsocket.adoc[]
=== Actuator properties
include::{generated-resources-root}/config-docs/actuator.adoc[]
......
......@@ -3437,6 +3437,83 @@ both clients and servers.
You can learn more about the resource configuration on the client side in the
<<boot-features-webclient-runtime, WebClient Runtime section>>.
[[boot-features-rsocket]]
== RSocket
https://rsocket.io[RSocket] is a binary protocol for use on byte stream transports.
It enables symmetric interaction models via async message passing over a single connection.
Spring Framework, with the Spring Messaging module supports RSocket both on the server and
the client side. On the server side, it lets you create special `@Controller` beans
to handle incoming RSocket messages.
Methods in your controller are mapped to RSocket routes by using `@MessageMapping` annotations.
The following code shows a typical `@Controller`:
[source,java,indent=0]
----
@Controller
public class MyRSocketController {
@MessageMapping("chat.room.{name}")
public Flux<ChatMessages> enterChatRoom(@DestinationVariable String chatRoom,
Flux<ChatMessages> messages) {
// ...
}
@MessageMapping("users.{user}.info")
Mono<ChatUserInfo> getUserInfo(@DestinationVariable String user) {
// ...
}
}
----
[[boot-features-rsocket-server-auto-configuration]]
=== RSocket server Auto-configuration
Spring Boot provides auto-configuration for RSocket servers. The required dependencies
are provided by the `spring-boot-starter-rsocket`.
Spring Boot will start an RSocket server as a new embedded server in your application,
or will plug the RSocket infrastructure into an existing reactive Web server. This
depends on the type of application and its configuration.
In case of a WebFlux application (i.e. of type `WebApplicationType.REACTIVE`), the
RSocket server will be plugged into the existing Web Server only if the following
properties match:
[source,properties,indent=0,subs="verbatim,quotes,attributes"]
----
spring.rsocket.server.mapping-path=/rsocket # a mapping path is defined
spring.rsocket.server.transport=websocket # websocket is chosen as a transport
#spring.rsocket.server.port= # no port is defined
----
The only other way to create an RSocket server is to start an independent, embedded
RSocket server. Besides the dependency requirements, the only required configuration
is to define a port for that server:
[source,properties,indent=0,subs="verbatim,quotes,attributes"]
----
spring.rsocket.server.port=9898 # the only required configuration
spring.rsocket.server.transport=tcp # you're free to configure other properties
----
[[boot-features-rsocket-messaging]]
=== Spring Messaging RSocket support
Spring Boot will auto-configure the Spring Messaging infrastructure for RSocket.
An `RSocketStrategies` bean is created to provide encoding and decoding support
for RSocket messages. By default, Spring Boot will try to auto-configure JSON
support with Jackson for `application/json` and `"application/*+json"` media types.
Check out the <<boot-features-json-jackson,Jackson support section>> to know more
about customization possibilities.
Developers can create `RSocketStrategiesCustomizer` beans to add other strategies,
assuming there are `Encoder` and `Decoder` implementations available.
[[boot-features-security]]
......
......@@ -40,6 +40,8 @@ def generateConfigMetadataDocumentation() {
"spring.mvc", "spring.resources", "spring.webflux")
.addSection("json")
.withKeyPrefixes("spring.jackson", "spring.gson")
.addSection("rsocket")
.withKeyPrefixes("spring.rsocket")
.addSection("templating")
.withKeyPrefixes("spring.freemarker", "spring.groovy", "spring.mustache", "spring.thymeleaf")
.addOverride("spring.groovy.template.configuration", "See GroovyMarkupConfigurer")
......
......@@ -61,6 +61,7 @@
<module>spring-boot-starter-parent</module>
<module>spring-boot-starter-quartz</module>
<module>spring-boot-starter-reactor-netty</module>
<module>spring-boot-starter-rsocket</module>
<module>spring-boot-starter-security</module>
<module>spring-boot-starter-test</module>
<module>spring-boot-starter-thymeleaf</module>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2012-2019 the original author or authors.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ https://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starters</artifactId>
<version>${revision}</version>
</parent>
<artifactId>spring-boot-starter-rsocket</artifactId>
<name>Spring Boot RSocket Starter</name>
<description>Starter for building RSocket clients and servers.</description>
<properties>
<main.basedir>${basedir}/../../..</main.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-reactor-netty</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
</dependencies>
</project>
......@@ -81,6 +81,16 @@
<artifactId>netty-tcnative-boringssl-static</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.undertow</groupId>
<artifactId>undertow-servlet</artifactId>
......@@ -258,6 +268,11 @@
<artifactId>slf4j-api</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-orm</artifactId>
......
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.rsocket.context;
import org.springframework.boot.rsocket.server.RSocketServer;
import org.springframework.context.ApplicationEvent;
/**
* Event to be published after the application context is refreshed and the
* {@link RSocketServer} is ready. Useful for obtaining the local port of a running
* server.
*
* @author Brian Clozel
* @since 2.2.0
*/
@SuppressWarnings("serial")
public class RSocketServerInitializedEvent extends ApplicationEvent {
public RSocketServerInitializedEvent(RSocketServer rSocketServer) {
super(rSocketServer);
}
/**
* Access the {@link RSocketServer}.
* @return the embedded RSocket server
*/
public RSocketServer getrSocketServer() {
return getSource();
}
/**
* Access the source of the event (an {@link RSocketServer}).
* @return the embedded web server
*/
@Override
public RSocketServer getSource() {
return (RSocketServer) super.getSource();
}
}
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.rsocket.messaging;
import org.springframework.messaging.rsocket.RSocketStrategies;
/**
* Callback interface that can be used to customize codecs configuration for an RSocket
* client and/or server with {@link RSocketStrategies}.
*
* @author Brian Clozel
* @since 2.2.0
*/
@FunctionalInterface
public interface RSocketStrategiesCustomizer {
/**
* Callback to customize a {@link RSocketStrategies.Builder} instance.
* @param strategies rSocket codec strategies to customize
*/
void customize(RSocketStrategies.Builder strategies);
}
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.rsocket.netty;
import org.springframework.boot.rsocket.context.RSocketServerInitializedEvent;
import org.springframework.boot.rsocket.server.RSocketServer;
import org.springframework.boot.rsocket.server.RSocketServerFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.messaging.rsocket.MessageHandlerAcceptor;
/**
* Bootstrap an {@link RSocketServer} and start it with the application context.
*
* @author Brian Clozel
* @since 2.2.0
*/
public class NettyRSocketBootstrap
implements ApplicationEventPublisherAware, SmartLifecycle {
private final RSocketServer rSocketServer;
private ApplicationEventPublisher applicationEventPublisher;
public NettyRSocketBootstrap(RSocketServerFactory serverFactoryProvider,
MessageHandlerAcceptor messageHandlerAcceptorProvider) {
this.rSocketServer = serverFactoryProvider.create(messageHandlerAcceptorProvider);
}
@Override
public void setApplicationEventPublisher(
ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
@Override
public void start() {
this.rSocketServer.start();
this.applicationEventPublisher
.publishEvent(new RSocketServerInitializedEvent(this.rSocketServer));
}
@Override
public void stop() {
this.rSocketServer.stop();
}
@Override
public boolean isRunning() {
RSocketServer server = this.rSocketServer;
if (server != null) {
return server.address() != null;
}
return false;
}
}
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.rsocket.netty;
import java.net.InetSocketAddress;
import java.time.Duration;
import io.rsocket.transport.netty.server.CloseableChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import org.springframework.boot.rsocket.server.RSocketServer;
import org.springframework.boot.rsocket.server.RSocketServerException;
import org.springframework.util.Assert;
/**
* {@link RSocketServer} that is based on a Reactor Netty server. Usually this class
* should be created using the {@link NettyRSocketServerFactory} and not directly.
*
* @author Brian Clozel
* @since 2.2.0
*/
public class NettyRSocketServer implements RSocketServer {
private static final Log logger = LogFactory.getLog(NettyRSocketServer.class);
private final Mono<CloseableChannel> starter;
private final Duration lifecycleTimeout;
private CloseableChannel channel;
public NettyRSocketServer(Mono<CloseableChannel> starter, Duration lifecycleTimeout) {
Assert.notNull(starter, "starter must not be null");
this.starter = starter;
this.lifecycleTimeout = lifecycleTimeout;
}
@Override
public InetSocketAddress address() {
if (this.channel != null) {
return this.channel.address();
}
return null;
}
@Override
public void start() throws RSocketServerException {
if (this.lifecycleTimeout != null) {
this.channel = this.starter.block(this.lifecycleTimeout);
}
else {
this.channel = this.starter.block();
}
logger.info("Netty RSocket started on port(s): " + address().getPort());
startDaemonAwaitThread(this.channel);
}
private void startDaemonAwaitThread(CloseableChannel channel) {
Thread awaitThread = new Thread("rsocket") {
@Override
public void run() {
channel.onClose().block();
}
};
awaitThread.setContextClassLoader(getClass().getClassLoader());
awaitThread.setDaemon(false);
awaitThread.start();
}
@Override
public void stop() throws RSocketServerException {
if (this.channel != null) {
this.channel.dispose();
this.channel = null;
}
}
}
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.rsocket.netty;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.transport.netty.server.WebsocketServerTransport;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServer;
import reactor.netty.tcp.TcpServer;
import org.springframework.boot.rsocket.server.ConfigurableRSocketServerFactory;
import org.springframework.boot.rsocket.server.RSocketServer;
import org.springframework.boot.rsocket.server.RSocketServerFactory;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryCustomizer;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import org.springframework.util.Assert;
/**
* {@link RSocketServerFactory} that can be used to create {@link RSocketServer}s backed
* by Netty.
*
* @author Brian Clozel
* @since 2.2.0
*/
public class NettyRSocketServerFactory
implements RSocketServerFactory, ConfigurableRSocketServerFactory {
private int port = 9898;
private InetAddress address;
private RSocketServer.TRANSPORT transport = RSocketServer.TRANSPORT.TCP;
private ReactorResourceFactory resourceFactory;
private Duration lifecycleTimeout;
private List<ServerRSocketFactoryCustomizer> serverCustomizers = new ArrayList<>();
@Override
public void setPort(int port) {
this.port = port;
}
@Override
public void setAddress(InetAddress address) {
this.address = address;
}
@Override
public void setTransport(RSocketServer.TRANSPORT transport) {
this.transport = transport;
}
/**
* Set the {@link ReactorResourceFactory} to get the shared resources from.
* @param resourceFactory the server resources
*/
public void setResourceFactory(ReactorResourceFactory resourceFactory) {
this.resourceFactory = resourceFactory;
}
/**
* Set {@link ServerRSocketFactoryCustomizer}s that should be applied to the RSocket
* server builder. Calling this method will replace any existing customizers.
* @param serverCustomizers the customizers to set
*/
public void setServerCustomizers(
Collection<? extends ServerRSocketFactoryCustomizer> serverCustomizers) {
Assert.notNull(serverCustomizers, "ServerCustomizers must not be null");
this.serverCustomizers = new ArrayList<>(serverCustomizers);
}
/**
* Add {@link ServerRSocketFactoryCustomizer}s that should applied while building the
* server.
* @param serverCustomizers the customizers to add
*/
public void addServerCustomizers(
ServerRSocketFactoryCustomizer... serverCustomizers) {
Assert.notNull(serverCustomizers, "ServerCustomizer must not be null");
this.serverCustomizers.addAll(Arrays.asList(serverCustomizers));
}
/**
* Set the maximum amount of time that should be waited when starting or stopping the
* server.
* @param lifecycleTimeout the lifecycle timeout
*/
public void setLifecycleTimeout(Duration lifecycleTimeout) {
this.lifecycleTimeout = lifecycleTimeout;
}
@Override
public NettyRSocketServer create(SocketAcceptor socketAcceptor) {
ServerTransport<CloseableChannel> transport = createTransport();
RSocketFactory.ServerRSocketFactory factory = RSocketFactory.receive();
for (ServerRSocketFactoryCustomizer customizer : this.serverCustomizers) {
factory = customizer.apply(factory);
}
Mono<CloseableChannel> starter = factory.acceptor(socketAcceptor)
.transport(transport).start();
return new NettyRSocketServer(starter, this.lifecycleTimeout);
}
private ServerTransport<CloseableChannel> createTransport() {
if (this.transport == RSocketServer.TRANSPORT.WEBSOCKET) {
if (this.resourceFactory != null) {
HttpServer httpServer = HttpServer.create()
.tcpConfiguration((tcpServer) -> tcpServer
.runOn(this.resourceFactory.getLoopResources()));
return WebsocketServerTransport.create(httpServer);
}
else {
return WebsocketServerTransport.create(getListenAddress());
}
}
else {
if (this.resourceFactory != null) {
TcpServer tcpServer = TcpServer.create()
.runOn(this.resourceFactory.getLoopResources())
.addressSupplier(this::getListenAddress);
return TcpServerTransport.create(tcpServer);
}
else {
return TcpServerTransport.create(getListenAddress());
}
}
}
private InetSocketAddress getListenAddress() {
if (this.address != null) {
return new InetSocketAddress(this.address.getHostAddress(), this.port);
}
return new InetSocketAddress(this.port);
}
}
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.rsocket.server;
import java.net.InetAddress;
/**
* A configurable {@link RSocketServerFactory}.
*
* @author Brian Clozel
* @since 2.2.0
*/
public interface ConfigurableRSocketServerFactory {
/**
* Set the port that the server should listen on. If not specified port '9898' will be
* used.
* @param port the port to set
*/
void setPort(int port);
/**
* Set the specific network address that the server should bind to.
* @param address the address to set (defaults to {@code null})
*/
void setAddress(InetAddress address);
/**
* Set the transport that the RSocket server should use.
* @param transport the transport protocol to use
*/
void setTransport(RSocketServer.TRANSPORT transport);
}
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.rsocket.server;
import java.net.InetSocketAddress;
/**
* Simple interface that represents a fully configured RSocket server. Allows the server
* to be {@link #start() started} and {@link #stop() stopped}.
*
* @author Brian Clozel
* @since 2.2.0
*/
public interface RSocketServer {
/**
* Starts the RSocket server. Calling this method on an already started server has no
* effect.
* @throws RSocketServerException if the server cannot be started
*/
void start() throws RSocketServerException;
/**
* Stops the RSocket server. Calling this method on an already stopped server has no
* effect.
* @throws RSocketServerException if the server cannot be stopped
*/
void stop() throws RSocketServerException;
/**
* Return the address this server is listening on.
* @return the address
*/
InetSocketAddress address();
/**
* Choice of transport protocol for the RSocket server.
*/
enum TRANSPORT {
TCP, WEBSOCKET
}
}
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.rsocket.server;
/**
* Exceptions thrown by an RSocket server.
*
* @author Brian Clozel
* @since 2.2.0
*/
@SuppressWarnings("serial")
public class RSocketServerException extends RuntimeException {
public RSocketServerException(String message, Throwable cause) {
super(message, cause);
}
}
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.rsocket.server;
import io.rsocket.SocketAcceptor;
/**
* Factory interface that can be used to create a reactive {@link RSocketServer}.
*
* @author Brian Clozel
* @since 2.2.0
*/
@FunctionalInterface
public interface RSocketServerFactory {
/**
* Gets a new fully configured but paused {@link RSocketServer} instance. Clients
* should not be able to connect to the returned server until
* {@link RSocketServer#start()} is called (which happens when the
* {@code ApplicationContext} has been fully refreshed).
* @param socketAcceptor the socket acceptor
* @return a fully configured and started {@link RSocketServer}
* @see RSocketServer#stop()
*/
RSocketServer create(SocketAcceptor socketAcceptor);
}
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.rsocket.server;
import java.util.function.Function;
import io.rsocket.RSocketFactory;
/**
* Mapping function that can be used to customize an RSocket server factory.
*
* @author Brian Clozel
* @see RSocketServerFactory
* @since 2.2.0
*/
@FunctionalInterface
public interface ServerRSocketFactoryCustomizer extends
Function<RSocketFactory.ServerRSocketFactory, RSocketFactory.ServerRSocketFactory> {
}
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.rsocket.netty;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import io.netty.buffer.PooledByteBufAllocator;
import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.rsocket.util.DefaultPayload;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.InOrder;
import reactor.core.publisher.Mono;
import org.springframework.boot.rsocket.server.RSocketServer;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryCustomizer;
import org.springframework.boot.testsupport.rule.OutputCapture;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.SocketUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
/**
* Tests for {@link NettyRSocketServerFactory}
*
* @author Brian Clozel
*/
public class NettyRSocketServerFactoryTests {
@Rule
public OutputCapture output = new OutputCapture();
private NettyRSocketServer rSocketServer;
private RSocketRequester requester;
private static final Duration TIMEOUT = Duration.ofSeconds(3);
@After
public void tearDown() {
if (this.rSocketServer != null) {
try {
this.rSocketServer.stop();
}
catch (Exception ex) {
// Ignore
}
}
if (this.requester != null) {
this.requester.rsocket().dispose();
}
}
private NettyRSocketServerFactory getFactory() {
return new NettyRSocketServerFactory();
}
@Test
public void specificPort() {
NettyRSocketServerFactory factory = getFactory();
int specificPort = SocketUtils.findAvailableTcpPort(41000);
factory.setPort(specificPort);
this.rSocketServer = factory.create(new EchoRequestResponseAcceptor());
this.rSocketServer.start();
this.requester = getRSocketRequester(createRSocketTcpClient());
String payload = "test payload";
String response = this.requester.route("test").data(payload)
.retrieveMono(String.class).block(TIMEOUT);
assertThat(response).isEqualTo(payload);
assertThat(this.rSocketServer.address().getPort()).isEqualTo(specificPort);
}
@Test
public void websocketTransport() {
NettyRSocketServerFactory factory = getFactory();
factory.setTransport(RSocketServer.TRANSPORT.WEBSOCKET);
this.rSocketServer = factory.create(new EchoRequestResponseAcceptor());
this.rSocketServer.start();
this.requester = getRSocketRequester(createRSocketWebSocketClient());
String payload = "test payload";
String response = this.requester.route("test").data(payload)
.retrieveMono(String.class).block(TIMEOUT);
assertThat(response).isEqualTo(payload);
}
@Test
public void serverCustomizers() {
NettyRSocketServerFactory factory = getFactory();
ServerRSocketFactoryCustomizer[] customizers = new ServerRSocketFactoryCustomizer[2];
for (int i = 0; i < customizers.length; i++) {
customizers[i] = mock(ServerRSocketFactoryCustomizer.class);
given(customizers[i].apply(any(RSocketFactory.ServerRSocketFactory.class)))
.will((invocation) -> invocation.getArgument(0));
}
factory.setServerCustomizers(Arrays.asList(customizers[0], customizers[1]));
this.rSocketServer = factory.create(new EchoRequestResponseAcceptor());
InOrder ordered = inOrder((Object[]) customizers);
for (ServerRSocketFactoryCustomizer customizer : customizers) {
ordered.verify(customizer)
.apply(any(RSocketFactory.ServerRSocketFactory.class));
}
}
private RSocket createRSocketTcpClient() {
Assertions.assertThat(this.rSocketServer).isNotNull();
InetSocketAddress address = this.rSocketServer.address();
return RSocketFactory.connect().dataMimeType(MimeTypeUtils.TEXT_PLAIN_VALUE)
.transport(TcpClientTransport.create(address)).start().block();
}
private RSocket createRSocketWebSocketClient() {
Assertions.assertThat(this.rSocketServer).isNotNull();
InetSocketAddress address = this.rSocketServer.address();
return RSocketFactory.connect().dataMimeType(MimeTypeUtils.TEXT_PLAIN_VALUE)
.transport(WebsocketClientTransport.create(address)).start().block();
}
private RSocketRequester getRSocketRequester(RSocket rSocketClient) {
RSocketStrategies strategies = RSocketStrategies.builder()
.decoder(StringDecoder.allMimeTypes())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(
new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT))
.build();
return RSocketRequester.create(rSocketClient, MimeTypeUtils.TEXT_PLAIN,
strategies);
}
static class EchoRequestResponseAcceptor implements SocketAcceptor {
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setupPayload,
RSocket rSocket) {
return Mono.just(new AbstractRSocket() {
@Override
public Mono<Payload> requestResponse(Payload payload) {
return Mono.just(DefaultPayload.create(payload));
}
});
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment