Remove custom server creation logic and rely on boot instead

Resolves #579
This commit is contained in:
Oleg Zhurakousky
2020-08-26 10:01:43 +02:00
parent 30572cf0fc
commit c1240ebb91
4 changed files with 28 additions and 63 deletions

View File

@@ -19,13 +19,17 @@ package org.springframework.cloud.function.rsocket;
import java.net.InetSocketAddress;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.rsocket.context.RSocketServerBootstrap;
import org.springframework.boot.rsocket.server.RSocketServerFactory;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.FunctionRegistration;
@@ -57,8 +61,16 @@ class RSocketAutoConfiguration {
@Bean
public FunctionToRSocketBinder functionToDestinationBinder(FunctionCatalog functionCatalog,
FunctionProperties functionProperties, RSocketFunctionProperties rSocketFunctionProperties) {
return new FunctionToRSocketBinder(functionCatalog, functionProperties, rSocketFunctionProperties);
FunctionProperties functionProperties) {
return new FunctionToRSocketBinder(functionCatalog, functionProperties);
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty("spring.rsocket.server.port")
RSocketServerBootstrap rSocketServerBootstrap(RSocketServerFactory rSocketServerFactory,
FunctionToRSocketBinder binder) {
return new RSocketServerBootstrap(rSocketServerFactory, SocketAcceptor.with(binder.getRSocket()));
}
/**
@@ -70,19 +82,15 @@ class RSocketAutoConfiguration {
private final FunctionProperties functionProperties;
private final RSocketFunctionProperties rSocketFunctionProperties;
private RSocketListenerFunction invocableFunction;
private GenericApplicationContext context;
private boolean started;
FunctionToRSocketBinder(FunctionCatalog functionCatalog, FunctionProperties functionProperties,
RSocketFunctionProperties rSocketFunctionProperties) {
FunctionToRSocketBinder(FunctionCatalog functionCatalog, FunctionProperties functionProperties) {
this.functionCatalog = functionCatalog;
this.functionProperties = functionProperties;
this.rSocketFunctionProperties = rSocketFunctionProperties;
}
@Override
@@ -102,15 +110,7 @@ class RSocketAutoConfiguration {
throw new UnsupportedOperationException("Supplier is not currently supported for RSocket interaction");
}
if (StringUtils.hasText(rSocketFunctionProperties.getBindAddress())
&& rSocketFunctionProperties.getBindPort() != null) {
InetSocketAddress bindAddress = InetSocketAddress.createUnresolved(
this.rSocketFunctionProperties.getBindAddress(), this.rSocketFunctionProperties.getBindPort());
this.invocableFunction = new RSocketListenerFunction(function, bindAddress);
}
else {
this.invocableFunction = new RSocketListenerFunction(function, null);
}
this.invocableFunction = new RSocketListenerFunction(function);
}
RSocket getRSocket() {

View File

@@ -24,6 +24,7 @@ import org.springframework.cloud.function.context.FunctionProperties;
* The prefix for these properties is `spring.cloud.function.rscocket`.
*
* @author Oleg Zhurakousky
* @author Spencer Gibb
* @since 3.1
*/
@ConfigurationProperties(prefix = FunctionProperties.PREFIX + ".rsocket")
@@ -31,10 +32,6 @@ public class RSocketFunctionProperties {
private boolean enabled;
private String bindAddress;
private Integer bindPort;
public boolean isEnabled() {
return this.enabled;
}
@@ -42,20 +39,4 @@ public class RSocketFunctionProperties {
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String getBindAddress() {
return bindAddress;
}
public void setBindAddress(String bindAddress) {
this.bindAddress = bindAddress;
}
public Integer getBindPort() {
return bindPort;
}
public void setBindPort(Integer bindPort) {
this.bindPort = bindPort;
}
}

View File

@@ -17,7 +17,6 @@
package org.springframework.cloud.function.rsocket;
import java.lang.reflect.Type;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.function.Function;
@@ -55,15 +54,12 @@ class RSocketListenerFunction implements Function<Message<byte[]>, Publisher<Mes
private static Log logger = LogFactory.getLog(RSocketListenerFunction.class);
private final InetSocketAddress listenAddress;
private final FunctionInvocationWrapper targetFunction;
private Disposable rsocketConnection;
private RSocket rsocket;
RSocketListenerFunction(FunctionInvocationWrapper targetFunction, InetSocketAddress listenAddress) {
this.listenAddress = listenAddress;
RSocketListenerFunction(FunctionInvocationWrapper targetFunction) {
this.targetFunction = targetFunction;
}
@@ -71,7 +67,7 @@ class RSocketListenerFunction implements Function<Message<byte[]>, Publisher<Mes
@Override
public Publisher<Message<byte[]>> apply(Message<byte[]> input) {
if (logger.isDebugEnabled()) {
logger.debug("Executiing: " + this.targetFunction + " on " + this.listenAddress);
logger.debug("Executiing: " + this.targetFunction);
}
Object rawResult = this.targetFunction.apply(input);
@@ -84,9 +80,6 @@ class RSocketListenerFunction implements Function<Message<byte[]>, Publisher<Mes
if (rsocket == null) {
rsocket = buildRSocket(this.targetFunction, functionType, this);
}
if (this.listenAddress != null) {
this.rsocketConnection = RSocketConnectionUtils.createServerSocket(rsocket, this.listenAddress);
}
this.printSplashScreen(this.targetFunction.getFunctionDefinition(), functionType);
}

View File

@@ -48,8 +48,7 @@ public class RSocketAutoConfigurationTests {
new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.cloud.function.definition=uppercase",
"--spring.cloud.function.rsocket.bind-address=localhost",
"--spring.cloud.function.rsocket.bind-port=" + port);
"--spring.rsocket.server.port=" + port);
RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null);
Mono<String> result = socket.requestResponse(DefaultPayload.create("\"hello\"")).map(Payload::getDataUtf8);
@@ -67,8 +66,7 @@ public class RSocketAutoConfigurationTests {
new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.cloud.function.definition=uppercase",
"--spring.cloud.function.rsocket.bind-address=localhost",
"--spring.cloud.function.rsocket.bind-port=" + port);
"--spring.rsocket.server.port=" + port);
RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null);
Flux<String> result = socket.requestStream(DefaultPayload.create("\"hello\"")).map(Payload::getDataUtf8);
@@ -86,8 +84,7 @@ public class RSocketAutoConfigurationTests {
new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.cloud.function.definition=uppercase",
"--spring.cloud.function.rsocket.bind-address=localhost",
"--spring.cloud.function.rsocket.bind-port=" + port);
"--spring.rsocket.server.port=" + port);
RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null);
Flux<String> result = socket.requestChannel(Flux.just(
@@ -111,8 +108,7 @@ public class RSocketAutoConfigurationTests {
new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.cloud.function.definition=uppercaseReactive",
"--spring.cloud.function.rsocket.bind-address=localhost",
"--spring.cloud.function.rsocket.bind-port=" + port);
"--spring.rsocket.server.port=" + port);
RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null);
@@ -131,8 +127,7 @@ public class RSocketAutoConfigurationTests {
new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.cloud.function.definition=uppercaseReactive",
"--spring.cloud.function.rsocket.bind-address=localhost",
"--spring.cloud.function.rsocket.bind-port=" + port);
"--spring.rsocket.server.port=" + port);
RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null);
@@ -151,8 +146,7 @@ public class RSocketAutoConfigurationTests {
new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.cloud.function.definition=uppercaseReactive",
"--spring.cloud.function.rsocket.bind-address=localhost",
"--spring.cloud.function.rsocket.bind-port=" + port);
"--spring.rsocket.server.port=" + port);
RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null);
@@ -179,14 +173,12 @@ public class RSocketAutoConfigurationTests {
new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.cloud.function.definition=uppercase|concat",
"--spring.cloud.function.rsocket.bind-address=localhost",
"--spring.cloud.function.rsocket.bind-port=" + portA);
"--spring.rsocket.server.port=" + portA);
new SpringApplicationBuilder(AdditionalFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.cloud.function.definition=reverse>localhost:" + portA + "|wrap",
"--spring.cloud.function.rsocket.bind-address=localhost",
"--spring.cloud.function.rsocket.bind-port=" + portB);
"--spring.rsocket.server.port=" + portB);
RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", portB), null);
Mono<String> result = socket.requestResponse(DefaultPayload.create("\"hello\"")).map(Payload::getDataUtf8);
@@ -203,8 +195,7 @@ public class RSocketAutoConfigurationTests {
new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.cloud.function.definition=uppercaseReactive",
"--spring.cloud.function.rsocket.bind-address=localhost",
"--spring.cloud.function.rsocket.bind-port=" + port);
"--spring.rsocket.server.port=" + port);
RSocket socket = RSocketConnectionUtils.createClientSocket(InetSocketAddress.createUnresolved("localhost", port), null);