Remove custom server creation logic and rely on boot instead
Resolves #579
This commit is contained in:
@@ -19,13 +19,17 @@ package org.springframework.cloud.function.rsocket;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import io.rsocket.RSocket;
|
||||
import io.rsocket.SocketAcceptor;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.boot.rsocket.context.RSocketServerBootstrap;
|
||||
import org.springframework.boot.rsocket.server.RSocketServerFactory;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.context.FunctionProperties;
|
||||
import org.springframework.cloud.function.context.FunctionRegistration;
|
||||
@@ -57,8 +61,16 @@ class RSocketAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
public FunctionToRSocketBinder functionToDestinationBinder(FunctionCatalog functionCatalog,
|
||||
FunctionProperties functionProperties, RSocketFunctionProperties rSocketFunctionProperties) {
|
||||
return new FunctionToRSocketBinder(functionCatalog, functionProperties, rSocketFunctionProperties);
|
||||
FunctionProperties functionProperties) {
|
||||
return new FunctionToRSocketBinder(functionCatalog, functionProperties);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
@ConditionalOnProperty("spring.rsocket.server.port")
|
||||
RSocketServerBootstrap rSocketServerBootstrap(RSocketServerFactory rSocketServerFactory,
|
||||
FunctionToRSocketBinder binder) {
|
||||
return new RSocketServerBootstrap(rSocketServerFactory, SocketAcceptor.with(binder.getRSocket()));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -70,19 +82,15 @@ class RSocketAutoConfiguration {
|
||||
|
||||
private final FunctionProperties functionProperties;
|
||||
|
||||
private final RSocketFunctionProperties rSocketFunctionProperties;
|
||||
|
||||
private RSocketListenerFunction invocableFunction;
|
||||
|
||||
private GenericApplicationContext context;
|
||||
|
||||
private boolean started;
|
||||
|
||||
FunctionToRSocketBinder(FunctionCatalog functionCatalog, FunctionProperties functionProperties,
|
||||
RSocketFunctionProperties rSocketFunctionProperties) {
|
||||
FunctionToRSocketBinder(FunctionCatalog functionCatalog, FunctionProperties functionProperties) {
|
||||
this.functionCatalog = functionCatalog;
|
||||
this.functionProperties = functionProperties;
|
||||
this.rSocketFunctionProperties = rSocketFunctionProperties;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -102,15 +110,7 @@ class RSocketAutoConfiguration {
|
||||
throw new UnsupportedOperationException("Supplier is not currently supported for RSocket interaction");
|
||||
}
|
||||
|
||||
if (StringUtils.hasText(rSocketFunctionProperties.getBindAddress())
|
||||
&& rSocketFunctionProperties.getBindPort() != null) {
|
||||
InetSocketAddress bindAddress = InetSocketAddress.createUnresolved(
|
||||
this.rSocketFunctionProperties.getBindAddress(), this.rSocketFunctionProperties.getBindPort());
|
||||
this.invocableFunction = new RSocketListenerFunction(function, bindAddress);
|
||||
}
|
||||
else {
|
||||
this.invocableFunction = new RSocketListenerFunction(function, null);
|
||||
}
|
||||
this.invocableFunction = new RSocketListenerFunction(function);
|
||||
}
|
||||
|
||||
RSocket getRSocket() {
|
||||
|
||||
@@ -24,6 +24,7 @@ import org.springframework.cloud.function.context.FunctionProperties;
|
||||
* The prefix for these properties is `spring.cloud.function.rscocket`.
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Spencer Gibb
|
||||
* @since 3.1
|
||||
*/
|
||||
@ConfigurationProperties(prefix = FunctionProperties.PREFIX + ".rsocket")
|
||||
@@ -31,10 +32,6 @@ public class RSocketFunctionProperties {
|
||||
|
||||
private boolean enabled;
|
||||
|
||||
private String bindAddress;
|
||||
|
||||
private Integer bindPort;
|
||||
|
||||
public boolean isEnabled() {
|
||||
return this.enabled;
|
||||
}
|
||||
@@ -42,20 +39,4 @@ public class RSocketFunctionProperties {
|
||||
public void setEnabled(boolean enabled) {
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
public String getBindAddress() {
|
||||
return bindAddress;
|
||||
}
|
||||
|
||||
public void setBindAddress(String bindAddress) {
|
||||
this.bindAddress = bindAddress;
|
||||
}
|
||||
|
||||
public Integer getBindPort() {
|
||||
return bindPort;
|
||||
}
|
||||
|
||||
public void setBindPort(Integer bindPort) {
|
||||
this.bindPort = bindPort;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,7 +17,6 @@
|
||||
package org.springframework.cloud.function.rsocket;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.function.Function;
|
||||
|
||||
@@ -55,15 +54,12 @@ class RSocketListenerFunction implements Function<Message<byte[]>, Publisher<Mes
|
||||
|
||||
private static Log logger = LogFactory.getLog(RSocketListenerFunction.class);
|
||||
|
||||
private final InetSocketAddress listenAddress;
|
||||
|
||||
private final FunctionInvocationWrapper targetFunction;
|
||||
|
||||
private Disposable rsocketConnection;
|
||||
private RSocket rsocket;
|
||||
|
||||
RSocketListenerFunction(FunctionInvocationWrapper targetFunction, InetSocketAddress listenAddress) {
|
||||
this.listenAddress = listenAddress;
|
||||
RSocketListenerFunction(FunctionInvocationWrapper targetFunction) {
|
||||
this.targetFunction = targetFunction;
|
||||
}
|
||||
|
||||
@@ -71,7 +67,7 @@ class RSocketListenerFunction implements Function<Message<byte[]>, Publisher<Mes
|
||||
@Override
|
||||
public Publisher<Message<byte[]>> apply(Message<byte[]> input) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Executiing: " + this.targetFunction + " on " + this.listenAddress);
|
||||
logger.debug("Executiing: " + this.targetFunction);
|
||||
}
|
||||
|
||||
Object rawResult = this.targetFunction.apply(input);
|
||||
@@ -84,9 +80,6 @@ class RSocketListenerFunction implements Function<Message<byte[]>, Publisher<Mes
|
||||
if (rsocket == null) {
|
||||
rsocket = buildRSocket(this.targetFunction, functionType, this);
|
||||
}
|
||||
if (this.listenAddress != null) {
|
||||
this.rsocketConnection = RSocketConnectionUtils.createServerSocket(rsocket, this.listenAddress);
|
||||
}
|
||||
this.printSplashScreen(this.targetFunction.getFunctionDefinition(), functionType);
|
||||
}
|
||||
|
||||
|
||||
@@ -48,8 +48,7 @@ public class RSocketAutoConfigurationTests {
|
||||
new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
"--spring.cloud.function.definition=uppercase",
|
||||
"--spring.cloud.function.rsocket.bind-address=localhost",
|
||||
"--spring.cloud.function.rsocket.bind-port=" + port);
|
||||
"--spring.rsocket.server.port=" + port);
|
||||
|
||||
RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null);
|
||||
Mono<String> result = socket.requestResponse(DefaultPayload.create("\"hello\"")).map(Payload::getDataUtf8);
|
||||
@@ -67,8 +66,7 @@ public class RSocketAutoConfigurationTests {
|
||||
new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
"--spring.cloud.function.definition=uppercase",
|
||||
"--spring.cloud.function.rsocket.bind-address=localhost",
|
||||
"--spring.cloud.function.rsocket.bind-port=" + port);
|
||||
"--spring.rsocket.server.port=" + port);
|
||||
|
||||
RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null);
|
||||
Flux<String> result = socket.requestStream(DefaultPayload.create("\"hello\"")).map(Payload::getDataUtf8);
|
||||
@@ -86,8 +84,7 @@ public class RSocketAutoConfigurationTests {
|
||||
new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
"--spring.cloud.function.definition=uppercase",
|
||||
"--spring.cloud.function.rsocket.bind-address=localhost",
|
||||
"--spring.cloud.function.rsocket.bind-port=" + port);
|
||||
"--spring.rsocket.server.port=" + port);
|
||||
|
||||
RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null);
|
||||
Flux<String> result = socket.requestChannel(Flux.just(
|
||||
@@ -111,8 +108,7 @@ public class RSocketAutoConfigurationTests {
|
||||
new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
"--spring.cloud.function.definition=uppercaseReactive",
|
||||
"--spring.cloud.function.rsocket.bind-address=localhost",
|
||||
"--spring.cloud.function.rsocket.bind-port=" + port);
|
||||
"--spring.rsocket.server.port=" + port);
|
||||
|
||||
RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null);
|
||||
|
||||
@@ -131,8 +127,7 @@ public class RSocketAutoConfigurationTests {
|
||||
new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
"--spring.cloud.function.definition=uppercaseReactive",
|
||||
"--spring.cloud.function.rsocket.bind-address=localhost",
|
||||
"--spring.cloud.function.rsocket.bind-port=" + port);
|
||||
"--spring.rsocket.server.port=" + port);
|
||||
|
||||
RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null);
|
||||
|
||||
@@ -151,8 +146,7 @@ public class RSocketAutoConfigurationTests {
|
||||
new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
"--spring.cloud.function.definition=uppercaseReactive",
|
||||
"--spring.cloud.function.rsocket.bind-address=localhost",
|
||||
"--spring.cloud.function.rsocket.bind-port=" + port);
|
||||
"--spring.rsocket.server.port=" + port);
|
||||
|
||||
RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null);
|
||||
|
||||
@@ -179,14 +173,12 @@ public class RSocketAutoConfigurationTests {
|
||||
new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
"--spring.cloud.function.definition=uppercase|concat",
|
||||
"--spring.cloud.function.rsocket.bind-address=localhost",
|
||||
"--spring.cloud.function.rsocket.bind-port=" + portA);
|
||||
"--spring.rsocket.server.port=" + portA);
|
||||
|
||||
new SpringApplicationBuilder(AdditionalFunctionConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
"--spring.cloud.function.definition=reverse>localhost:" + portA + "|wrap",
|
||||
"--spring.cloud.function.rsocket.bind-address=localhost",
|
||||
"--spring.cloud.function.rsocket.bind-port=" + portB);
|
||||
"--spring.rsocket.server.port=" + portB);
|
||||
|
||||
RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", portB), null);
|
||||
Mono<String> result = socket.requestResponse(DefaultPayload.create("\"hello\"")).map(Payload::getDataUtf8);
|
||||
@@ -203,8 +195,7 @@ public class RSocketAutoConfigurationTests {
|
||||
new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
"--spring.cloud.function.definition=uppercaseReactive",
|
||||
"--spring.cloud.function.rsocket.bind-address=localhost",
|
||||
"--spring.cloud.function.rsocket.bind-port=" + port);
|
||||
"--spring.rsocket.server.port=" + port);
|
||||
|
||||
RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user