Commit 3967e76b authored by Artem Bilan's avatar Artem Bilan Committed by Brian Clozel

Auto-Configure RSocket support for Spring Integration

This commit adds a new auto-configuration for RSocket support in Spring
Integration.

Given an application with `spring-messaging`, `spring-integration-rsocket`
and RSocket dependencies, developers are now able to leverage Spring
Integration features with RSocket.

It is now possible to configure an RSocket server with
`"spring.rsocket.server.*"` properties and let it use
`IntegrationRSocketEndpoint` or `RSocketOutboundGateway` components to
handle incoming RSocket messages. This infrastructure can handle Spring
Integration RSocket channel adapters and `@MessageMapping` handlers
(given `"spring.integration.rsocket.server.message-mapping-enabled"`is
configured.

If the `"spring.integration.rsocket.client.host"` and
`"spring.integration.rsocket.client.port"` (for TCP protocol), or
`"spring.integration.rsocket.client.uri"`  (for WebSocket) is configured
then a `ClientRSocketConnector` will be configured accordingly.

Closes gh-18834
Co-authored-by: 's avatarBrian Clozel <bclozel@pivotal.io>
parent 8dd0ef9b
...@@ -104,6 +104,7 @@ dependencies { ...@@ -104,6 +104,7 @@ dependencies {
optional("org.springframework.integration:spring-integration-core") optional("org.springframework.integration:spring-integration-core")
optional("org.springframework.integration:spring-integration-jdbc") optional("org.springframework.integration:spring-integration-jdbc")
optional("org.springframework.integration:spring-integration-jmx") optional("org.springframework.integration:spring-integration-jmx")
optional("org.springframework.integration:spring-integration-rsocket")
optional("org.springframework:spring-jms") optional("org.springframework:spring-jms")
optional("org.springframework:spring-orm") optional("org.springframework:spring-orm")
optional("org.springframework:spring-tx") optional("org.springframework:spring-tx")
......
/* /*
* Copyright 2012-2019 the original author or authors. * Copyright 2012-2020 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -19,8 +19,13 @@ package org.springframework.boot.autoconfigure.integration; ...@@ -19,8 +19,13 @@ package org.springframework.boot.autoconfigure.integration;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.sql.DataSource; import javax.sql.DataSource;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.server.TcpServerTransport;
import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactory;
import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
...@@ -29,8 +34,10 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandi ...@@ -29,8 +34,10 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandi
import org.springframework.boot.autoconfigure.condition.SearchStrategy; import org.springframework.boot.autoconfigure.condition.SearchStrategy;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration; import org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration;
import org.springframework.boot.autoconfigure.rsocket.RSocketMessagingAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Import;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
...@@ -42,6 +49,14 @@ import org.springframework.integration.gateway.GatewayProxyFactoryBean; ...@@ -42,6 +49,14 @@ import org.springframework.integration.gateway.GatewayProxyFactoryBean;
import org.springframework.integration.jdbc.store.JdbcMessageStore; import org.springframework.integration.jdbc.store.JdbcMessageStore;
import org.springframework.integration.jmx.config.EnableIntegrationMBeanExport; import org.springframework.integration.jmx.config.EnableIntegrationMBeanExport;
import org.springframework.integration.monitor.IntegrationMBeanExporter; import org.springframework.integration.monitor.IntegrationMBeanExporter;
import org.springframework.integration.rsocket.ClientRSocketConnector;
import org.springframework.integration.rsocket.IntegrationRSocketEndpoint;
import org.springframework.integration.rsocket.ServerRSocketConnector;
import org.springframework.integration.rsocket.ServerRSocketMessageHandler;
import org.springframework.integration.rsocket.outbound.RSocketOutboundGateway;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
/** /**
...@@ -138,4 +153,100 @@ public class IntegrationAutoConfiguration { ...@@ -138,4 +153,100 @@ public class IntegrationAutoConfiguration {
} }
/**
* Integration RSocket configuration.
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ IntegrationRSocketEndpoint.class, RSocketRequester.class, RSocketFactory.class })
@Conditional(IntegrationRSocketConfiguration.AnyRSocketChannelAdapterAvailable.class)
protected static class IntegrationRSocketConfiguration {
/**
* Check if either a {@link IntegrationRSocketEndpoint} or
* {@link RSocketOutboundGateway} bean is available.
*/
static class AnyRSocketChannelAdapterAvailable extends AnyNestedCondition {
AnyRSocketChannelAdapterAvailable() {
super(ConfigurationPhase.REGISTER_BEAN);
}
@ConditionalOnBean(IntegrationRSocketEndpoint.class)
static class IntegrationRSocketEndpointAvailable {
}
@ConditionalOnBean(RSocketOutboundGateway.class)
static class RSocketOutboundGatewayAvailable {
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(TcpServerTransport.class)
@AutoConfigureBefore(RSocketMessagingAutoConfiguration.class)
protected static class IntegrationRSocketServerConfiguration {
@Bean
@ConditionalOnMissingBean(ServerRSocketMessageHandler.class)
public RSocketMessageHandler serverRSocketMessageHandler(RSocketStrategies rSocketStrategies,
IntegrationProperties integrationProperties) {
RSocketMessageHandler messageHandler = new ServerRSocketMessageHandler(
integrationProperties.getRSocket().getServer().isMessageMappingEnabled());
messageHandler.setRSocketStrategies(rSocketStrategies);
return messageHandler;
}
@Bean
@ConditionalOnMissingBean
public ServerRSocketConnector serverRSocketConnector(ServerRSocketMessageHandler messageHandler) {
return new ServerRSocketConnector(messageHandler);
}
}
@Configuration(proxyBeanMethods = false)
protected static class IntegrationRSocketClientConfiguration {
@Bean
@ConditionalOnMissingBean
@Conditional(RemoteRSocketServerAddressConfigured.class)
public ClientRSocketConnector clientRSocketConnector(IntegrationProperties integrationProperties,
RSocketStrategies rSocketStrategies) {
IntegrationProperties.RSocket.Client client = integrationProperties.getRSocket().getClient();
ClientRSocketConnector clientRSocketConnector = (client.getUri() != null)
? new ClientRSocketConnector(client.getUri())
: new ClientRSocketConnector(client.getHost(), client.getPort());
clientRSocketConnector.setRSocketStrategies(rSocketStrategies);
return clientRSocketConnector;
}
/**
* Check if a remote address is configured for the RSocket Integration client.
*/
static class RemoteRSocketServerAddressConfigured extends AnyNestedCondition {
RemoteRSocketServerAddressConfigured() {
super(ConfigurationPhase.REGISTER_BEAN);
}
@ConditionalOnProperty(prefix = "spring.integration.rsocket.client", name = "uri")
static class WebSocketAddressConfigured {
}
@ConditionalOnProperty(prefix = "spring.integration.rsocket.client", name = { "host", "port" })
static class TcpAddressConfigured {
}
}
}
}
} }
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
package org.springframework.boot.autoconfigure.integration; package org.springframework.boot.autoconfigure.integration;
import java.net.URI;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceInitializationMode; import org.springframework.boot.jdbc.DataSourceInitializationMode;
...@@ -24,6 +26,7 @@ import org.springframework.boot.jdbc.DataSourceInitializationMode; ...@@ -24,6 +26,7 @@ import org.springframework.boot.jdbc.DataSourceInitializationMode;
* *
* @author Vedran Pavic * @author Vedran Pavic
* @author Stephane Nicoll * @author Stephane Nicoll
* @author Artem Bilan
* @since 2.0.0 * @since 2.0.0
*/ */
@ConfigurationProperties(prefix = "spring.integration") @ConfigurationProperties(prefix = "spring.integration")
...@@ -31,10 +34,16 @@ public class IntegrationProperties { ...@@ -31,10 +34,16 @@ public class IntegrationProperties {
private final Jdbc jdbc = new Jdbc(); private final Jdbc jdbc = new Jdbc();
private final RSocket rsocket = new RSocket();
public Jdbc getJdbc() { public Jdbc getJdbc() {
return this.jdbc; return this.jdbc;
} }
public RSocket getRSocket() {
return this.rsocket;
}
public static class Jdbc { public static class Jdbc {
private static final String DEFAULT_SCHEMA_LOCATION = "classpath:org/springframework/" private static final String DEFAULT_SCHEMA_LOCATION = "classpath:org/springframework/"
...@@ -68,4 +77,80 @@ public class IntegrationProperties { ...@@ -68,4 +77,80 @@ public class IntegrationProperties {
} }
public static class RSocket {
private final Client client = new Client();
private final Server server = new Server();
public Client getClient() {
return this.client;
}
public Server getServer() {
return this.server;
}
public static class Client {
/**
* TCP RSocket server host to connect to.
*/
private String host;
/**
* TCP RSocket server port to connect to.
*/
private Integer port;
/**
* WebSocket RSocket server uri to connect to.
*/
private URI uri;
public void setHost(String host) {
this.host = host;
}
public String getHost() {
return this.host;
}
public void setPort(Integer port) {
this.port = port;
}
public Integer getPort() {
return this.port;
}
public void setUri(URI uri) {
this.uri = uri;
}
public URI getUri() {
return this.uri;
}
}
public static class Server {
/**
* Whether to handle message mapping for RSocket via Spring Integration.
*/
boolean messageMappingEnabled;
public boolean isMessageMappingEnabled() {
return this.messageMappingEnabled;
}
public void setMessageMappingEnabled(boolean messageMappingEnabled) {
this.messageMappingEnabled = messageMappingEnabled;
}
}
}
} }
/* /*
* Copyright 2012-2019 the original author or authors. * Copyright 2012-2020 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -18,14 +18,22 @@ package org.springframework.boot.autoconfigure.integration; ...@@ -18,14 +18,22 @@ package org.springframework.boot.autoconfigure.integration;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration.IntegrationComponentScanConfiguration; import org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration.IntegrationComponentScanConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration; import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.EmbeddedDataSourceConfiguration; import org.springframework.boot.autoconfigure.jdbc.EmbeddedDataSourceConfiguration;
import org.springframework.boot.autoconfigure.jdbc.JdbcTemplateAutoConfiguration; import org.springframework.boot.autoconfigure.jdbc.JdbcTemplateAutoConfiguration;
import org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration; import org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration;
import org.springframework.boot.autoconfigure.rsocket.RSocketMessagingAutoConfiguration;
import org.springframework.boot.autoconfigure.rsocket.RSocketRequesterAutoConfiguration;
import org.springframework.boot.autoconfigure.rsocket.RSocketServerAutoConfiguration;
import org.springframework.boot.autoconfigure.rsocket.RSocketStrategiesAutoConfiguration;
import org.springframework.boot.jdbc.DataSourceInitializationMode; import org.springframework.boot.jdbc.DataSourceInitializationMode;
import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
...@@ -38,10 +46,16 @@ import org.springframework.integration.core.MessageSource; ...@@ -38,10 +46,16 @@ import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.MessageProcessorMessageSource; import org.springframework.integration.endpoint.MessageProcessorMessageSource;
import org.springframework.integration.gateway.RequestReplyExchanger; import org.springframework.integration.gateway.RequestReplyExchanger;
import org.springframework.integration.handler.MessageProcessor; import org.springframework.integration.handler.MessageProcessor;
import org.springframework.integration.rsocket.ClientRSocketConnector;
import org.springframework.integration.rsocket.IntegrationRSocketEndpoint;
import org.springframework.integration.rsocket.ServerRSocketConnector;
import org.springframework.integration.rsocket.ServerRSocketMessageHandler;
import org.springframework.integration.support.channel.HeaderChannelRegistry; import org.springframework.integration.support.channel.HeaderChannelRegistry;
import org.springframework.jdbc.BadSqlGrammarException; import org.springframework.jdbc.BadSqlGrammarException;
import org.springframework.jdbc.core.JdbcOperations; import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jmx.export.MBeanExporter; import org.springframework.jmx.export.MBeanExporter;
import org.springframework.messaging.Message;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
...@@ -188,6 +202,33 @@ class IntegrationAutoConfigurationTests { ...@@ -188,6 +202,33 @@ class IntegrationAutoConfigurationTests {
}); });
} }
@Test
void rsocketSupportEnabled() {
this.contextRunner.withUserConfiguration(RSocketServerConfiguration.class)
.withConfiguration(AutoConfigurations.of(RSocketServerAutoConfiguration.class,
RSocketStrategiesAutoConfiguration.class, RSocketMessagingAutoConfiguration.class,
RSocketRequesterAutoConfiguration.class, IntegrationAutoConfiguration.class))
.withPropertyValues("spring.rsocket.server.port=0", "spring.integration.rsocket.client.port=0",
"spring.integration.rsocket.client.host=localhost",
"spring.integration.rsocket.server.message-mapping-enabled=true")
.run((context) -> {
assertThat(context).hasSingleBean(ClientRSocketConnector.class).hasBean("clientRSocketConnector")
.hasSingleBean(ServerRSocketConnector.class)
.hasSingleBean(ServerRSocketMessageHandler.class)
.hasSingleBean(RSocketMessageHandler.class);
ServerRSocketMessageHandler serverRSocketMessageHandler = context
.getBean(ServerRSocketMessageHandler.class);
assertThat(context).getBean(RSocketMessageHandler.class).isSameAs(serverRSocketMessageHandler);
ClientRSocketConnector clientRSocketConnector = context.getBean(ClientRSocketConnector.class);
ClientTransport clientTransport = (ClientTransport) new DirectFieldAccessor(clientRSocketConnector)
.getPropertyValue("clientTransport");
assertThat(clientTransport).isInstanceOf(TcpClientTransport.class);
});
}
@Configuration(proxyBeanMethods = false) @Configuration(proxyBeanMethods = false)
static class CustomMBeanExporter { static class CustomMBeanExporter {
...@@ -220,4 +261,26 @@ class IntegrationAutoConfigurationTests { ...@@ -220,4 +261,26 @@ class IntegrationAutoConfigurationTests {
} }
@Configuration(proxyBeanMethods = false)
static class RSocketServerConfiguration {
@Bean
IntegrationRSocketEndpoint mockIntegrationRSocketEndpoint() {
return new IntegrationRSocketEndpoint() {
@Override
public Mono<Void> handleMessage(Message<?> message) {
return null;
}
@Override
public String[] getPath() {
return new String[] { "/rsocketTestPath" };
}
};
}
}
} }
...@@ -5932,6 +5932,21 @@ If `spring-integration-jdbc` is available, the default database schema can be cr ...@@ -5932,6 +5932,21 @@ If `spring-integration-jdbc` is available, the default database schema can be cr
spring.integration.jdbc.initialize-schema=always spring.integration.jdbc.initialize-schema=always
---- ----
If `spring-integration-rsocket` is available, developers can configure an RSocket server using `"spring.rsocket.server.*"` properties and let it use `IntegrationRSocketEndpoint` or `RSocketOutboundGateway` components to handle incoming RSocket messages.
This infrastructure can handle Spring Integration RSocket channel adapters and `@MessageMapping` handlers (given `"spring.integration.rsocket.server.message-mapping-enabled"` is configured).
Spring Boot can also auto-configure an `ClientRSocketConnector` using configuration properties:
[source,properties,indent=0,configprops]
----
# Connecting to a RSocket server over TCP
spring.integration.rsocket.client.host=example.org
spring.integration.rsocket.client.port=9898
# Connecting to a RSocket Server over WebSocket
spring.integration.rsocket.client.uri=ws://example.org
----
See the {spring-boot-autoconfigure-module-code}/integration/IntegrationAutoConfiguration.java[`IntegrationAutoConfiguration`] and {spring-boot-autoconfigure-module-code}/integration/IntegrationProperties.java[`IntegrationProperties`] classes for more details. See the {spring-boot-autoconfigure-module-code}/integration/IntegrationAutoConfiguration.java[`IntegrationAutoConfiguration`] and {spring-boot-autoconfigure-module-code}/integration/IntegrationProperties.java[`IntegrationProperties`] classes for more details.
By default, if a Micrometer `meterRegistry` bean is present, Spring Integration metrics will be managed by Micrometer. By default, if a Micrometer `meterRegistry` bean is present, Spring Integration metrics will be managed by Micrometer.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment