diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java index 0ac10a729..fe6fbf3ef 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java @@ -19,13 +19,17 @@ package org.springframework.cloud.function.rsocket; import java.net.InetSocketAddress; import io.rsocket.RSocket; +import io.rsocket.SocketAcceptor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.rsocket.context.RSocketServerBootstrap; +import org.springframework.boot.rsocket.server.RSocketServerFactory; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionRegistration; @@ -57,8 +61,16 @@ class RSocketAutoConfiguration { @Bean public FunctionToRSocketBinder functionToDestinationBinder(FunctionCatalog functionCatalog, - FunctionProperties functionProperties, RSocketFunctionProperties rSocketFunctionProperties) { - return new FunctionToRSocketBinder(functionCatalog, functionProperties, rSocketFunctionProperties); + FunctionProperties functionProperties) { + return new FunctionToRSocketBinder(functionCatalog, functionProperties); + } + + @Bean + @ConditionalOnMissingBean + @ConditionalOnProperty("spring.rsocket.server.port") + RSocketServerBootstrap rSocketServerBootstrap(RSocketServerFactory rSocketServerFactory, + FunctionToRSocketBinder binder) { + return new RSocketServerBootstrap(rSocketServerFactory, SocketAcceptor.with(binder.getRSocket())); } /** @@ -70,19 +82,15 @@ class RSocketAutoConfiguration { private final FunctionProperties functionProperties; - private final RSocketFunctionProperties rSocketFunctionProperties; - private RSocketListenerFunction invocableFunction; private GenericApplicationContext context; private boolean started; - FunctionToRSocketBinder(FunctionCatalog functionCatalog, FunctionProperties functionProperties, - RSocketFunctionProperties rSocketFunctionProperties) { + FunctionToRSocketBinder(FunctionCatalog functionCatalog, FunctionProperties functionProperties) { this.functionCatalog = functionCatalog; this.functionProperties = functionProperties; - this.rSocketFunctionProperties = rSocketFunctionProperties; } @Override @@ -102,15 +110,7 @@ class RSocketAutoConfiguration { throw new UnsupportedOperationException("Supplier is not currently supported for RSocket interaction"); } - if (StringUtils.hasText(rSocketFunctionProperties.getBindAddress()) - && rSocketFunctionProperties.getBindPort() != null) { - InetSocketAddress bindAddress = InetSocketAddress.createUnresolved( - this.rSocketFunctionProperties.getBindAddress(), this.rSocketFunctionProperties.getBindPort()); - this.invocableFunction = new RSocketListenerFunction(function, bindAddress); - } - else { - this.invocableFunction = new RSocketListenerFunction(function, null); - } + this.invocableFunction = new RSocketListenerFunction(function); } RSocket getRSocket() { diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunctionProperties.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunctionProperties.java index 5b2ce3d11..f6c137ccc 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunctionProperties.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunctionProperties.java @@ -24,6 +24,7 @@ import org.springframework.cloud.function.context.FunctionProperties; * The prefix for these properties is `spring.cloud.function.rscocket`. * * @author Oleg Zhurakousky + * @author Spencer Gibb * @since 3.1 */ @ConfigurationProperties(prefix = FunctionProperties.PREFIX + ".rsocket") @@ -31,10 +32,6 @@ public class RSocketFunctionProperties { private boolean enabled; - private String bindAddress; - - private Integer bindPort; - public boolean isEnabled() { return this.enabled; } @@ -42,20 +39,4 @@ public class RSocketFunctionProperties { public void setEnabled(boolean enabled) { this.enabled = enabled; } - - public String getBindAddress() { - return bindAddress; - } - - public void setBindAddress(String bindAddress) { - this.bindAddress = bindAddress; - } - - public Integer getBindPort() { - return bindPort; - } - - public void setBindPort(Integer bindPort) { - this.bindPort = bindPort; - } } diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java index e52a1cba3..ae61989bc 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java @@ -17,7 +17,6 @@ package org.springframework.cloud.function.rsocket; import java.lang.reflect.Type; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.function.Function; @@ -55,15 +54,12 @@ class RSocketListenerFunction implements Function, Publisher, Publisher> apply(Message input) { if (logger.isDebugEnabled()) { - logger.debug("Executiing: " + this.targetFunction + " on " + this.listenAddress); + logger.debug("Executiing: " + this.targetFunction); } Object rawResult = this.targetFunction.apply(input); @@ -84,9 +80,6 @@ class RSocketListenerFunction implements Function, Publisher result = socket.requestResponse(DefaultPayload.create("\"hello\"")).map(Payload::getDataUtf8); @@ -67,8 +66,7 @@ public class RSocketAutoConfigurationTests { new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run( "--logging.level.org.springframework.cloud.function=DEBUG", "--spring.cloud.function.definition=uppercase", - "--spring.cloud.function.rsocket.bind-address=localhost", - "--spring.cloud.function.rsocket.bind-port=" + port); + "--spring.rsocket.server.port=" + port); RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null); Flux result = socket.requestStream(DefaultPayload.create("\"hello\"")).map(Payload::getDataUtf8); @@ -86,8 +84,7 @@ public class RSocketAutoConfigurationTests { new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run( "--logging.level.org.springframework.cloud.function=DEBUG", "--spring.cloud.function.definition=uppercase", - "--spring.cloud.function.rsocket.bind-address=localhost", - "--spring.cloud.function.rsocket.bind-port=" + port); + "--spring.rsocket.server.port=" + port); RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null); Flux result = socket.requestChannel(Flux.just( @@ -111,8 +108,7 @@ public class RSocketAutoConfigurationTests { new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run( "--logging.level.org.springframework.cloud.function=DEBUG", "--spring.cloud.function.definition=uppercaseReactive", - "--spring.cloud.function.rsocket.bind-address=localhost", - "--spring.cloud.function.rsocket.bind-port=" + port); + "--spring.rsocket.server.port=" + port); RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null); @@ -131,8 +127,7 @@ public class RSocketAutoConfigurationTests { new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run( "--logging.level.org.springframework.cloud.function=DEBUG", "--spring.cloud.function.definition=uppercaseReactive", - "--spring.cloud.function.rsocket.bind-address=localhost", - "--spring.cloud.function.rsocket.bind-port=" + port); + "--spring.rsocket.server.port=" + port); RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null); @@ -151,8 +146,7 @@ public class RSocketAutoConfigurationTests { new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run( "--logging.level.org.springframework.cloud.function=DEBUG", "--spring.cloud.function.definition=uppercaseReactive", - "--spring.cloud.function.rsocket.bind-address=localhost", - "--spring.cloud.function.rsocket.bind-port=" + port); + "--spring.rsocket.server.port=" + port); RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null); @@ -179,14 +173,12 @@ public class RSocketAutoConfigurationTests { new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run( "--logging.level.org.springframework.cloud.function=DEBUG", "--spring.cloud.function.definition=uppercase|concat", - "--spring.cloud.function.rsocket.bind-address=localhost", - "--spring.cloud.function.rsocket.bind-port=" + portA); + "--spring.rsocket.server.port=" + portA); new SpringApplicationBuilder(AdditionalFunctionConfiguration.class).web(WebApplicationType.NONE).run( "--logging.level.org.springframework.cloud.function=DEBUG", "--spring.cloud.function.definition=reverse>localhost:" + portA + "|wrap", - "--spring.cloud.function.rsocket.bind-address=localhost", - "--spring.cloud.function.rsocket.bind-port=" + portB); + "--spring.rsocket.server.port=" + portB); RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", portB), null); Mono result = socket.requestResponse(DefaultPayload.create("\"hello\"")).map(Payload::getDataUtf8); @@ -203,8 +195,7 @@ public class RSocketAutoConfigurationTests { new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run( "--logging.level.org.springframework.cloud.function=DEBUG", "--spring.cloud.function.definition=uppercaseReactive", - "--spring.cloud.function.rsocket.bind-address=localhost", - "--spring.cloud.function.rsocket.bind-port=" + port); + "--spring.rsocket.server.port=" + port); RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null);