diff --git a/spring-cloud-function-rsocket/pom.xml b/spring-cloud-function-rsocket/pom.xml
index beec9dbc7..6610ca516 100644
--- a/spring-cloud-function-rsocket/pom.xml
+++ b/spring-cloud-function-rsocket/pom.xml
@@ -15,6 +15,10 @@
3.1.0-SNAPSHOT
+
+ 0.0.2-SNAPSHOT
+
+
@@ -35,19 +39,31 @@
org.springframework.boot
- spring-boot-starter-test
- test
+ spring-boot-configuration-processor
+ true
org.springframework.boot
- spring-boot-configuration-processor
- true
+ spring-boot-starter-test
+ test
io.projectreactor
reactor-test
test
+
+ io.rsocket.routing
+ rsocket-routing-client-spring
+ ${rsocket-routing.version}
+ test
+
+
+ io.rsocket.routing
+ rsocket-routing-broker-spring
+ ${rsocket-routing.version}
+ test
+
diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java
index 45c031782..8d2ac2d0e 100644
--- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java
+++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java
@@ -18,11 +18,16 @@ package org.springframework.cloud.function.rsocket;
import java.net.InetSocketAddress;
+import io.rsocket.RSocket;
+import io.rsocket.SocketAcceptor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
+import org.springframework.boot.autoconfigure.AutoConfigureBefore;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
@@ -36,28 +41,39 @@ import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.messaging.rsocket.RSocketConnectorConfigurer;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
- * Main configuration class for components required to support RSocket integration with spring-cloud-function.
+ * Main configuration class for components required to support RSocket integration with
+ * spring-cloud-function.
*
* @author Oleg Zhurakousky
* @since 3.1
*/
@Configuration(proxyBeanMethods = false)
-@EnableConfigurationProperties({FunctionProperties.class, RSocketFunctionProperties.class})
+@EnableConfigurationProperties({ FunctionProperties.class, RSocketFunctionProperties.class })
+@ConditionalOnProperty(name = FunctionProperties.PREFIX + ".rsocket.enabled", matchIfMissing = true)
+@AutoConfigureBefore(name = "io.rsocket.routing.client.spring.RoutingClientAutoConfiguration")
public class RSocketAutoConfiguration {
private static Log logger = LogFactory.getLog(RSocketAutoConfiguration.class);
-
@Bean
public FunctionToRSocketBinder functionToDestinationBinder(FunctionCatalog functionCatalog,
FunctionProperties functionProperties, RSocketFunctionProperties rSocketFunctionProperties) {
return new FunctionToRSocketBinder(functionCatalog, functionProperties, rSocketFunctionProperties);
}
+ @Bean
+ @ConditionalOnClass(name = "io.rsocket.routing.client.spring.RoutingClientAutoConfiguration")
+ //TODO: move to broker specific auto config
+ public RSocketConnectorConfigurer functionRSocketConnectorConfigurer(
+ FunctionToRSocketBinder binder) {
+ return connector -> connector.acceptor(SocketAcceptor.with(binder.getRSocket()));
+ }
+
/**
*
*/
@@ -81,6 +97,7 @@ public class RSocketAutoConfiguration {
this.functionProperties = functionProperties;
this.rSocketFunctionProperties = rSocketFunctionProperties;
}
+
@Override
public void afterPropertiesSet() throws Exception {
String definition = this.functionProperties.getDefinition();
@@ -92,16 +109,28 @@ public class RSocketAutoConfiguration {
}
Assert.isTrue(StringUtils.hasText(definition), "Failed to determine target function for RSocket.");
this.registerRsocketForwardingFunctionIfNecessary(definition);
- //TODO externalize content-type
+ // TODO externalize content-type
FunctionInvocationWrapper function = functionCatalog.lookup(definition, "application/json");
if (function.isSupplier()) {
throw new UnsupportedOperationException("Supplier is not currently supported for RSocket interaction");
}
- InetSocketAddress bindAddress = InetSocketAddress
- .createUnresolved(this.rSocketFunctionProperties.getBindAddress(), this.rSocketFunctionProperties.getBindPort());
+ if (StringUtils.hasText(rSocketFunctionProperties.getBindAddress())
+ && rSocketFunctionProperties.getBindPort() != null) {
+ InetSocketAddress bindAddress = InetSocketAddress.createUnresolved(
+ this.rSocketFunctionProperties.getBindAddress(), this.rSocketFunctionProperties.getBindPort());
+ this.invocableFunction = new RSocketListenerFunction(function, bindAddress);
+ }
+ else {
+ this.invocableFunction = new RSocketListenerFunction(function, null);
+ }
+ }
- this.invocableFunction = new RSocketListenerFunction(function, bindAddress);
+ RSocket getRSocket() {
+ if (this.invocableFunction == null) {
+ return null;
+ }
+ return this.invocableFunction.getRsocket();
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -115,16 +144,18 @@ public class RSocketAutoConfiguration {
}
String[] functionToRSocketDefinition = StringUtils.delimitedListToStringArray(name, ">");
Assert.isTrue(functionToRSocketDefinition.length == 2, "Must only contain one output redirect");
- FunctionInvocationWrapper function = functionCatalog.lookup(functionToRSocketDefinition[0], "application/json");
+ FunctionInvocationWrapper function = functionCatalog.lookup(functionToRSocketDefinition[0],
+ "application/json");
String[] hostPort = StringUtils.delimitedListToStringArray(functionToRSocketDefinition[1], ":");
- InetSocketAddress outputAddress = InetSocketAddress
- .createUnresolved(hostPort[0], Integer.valueOf(hostPort[1]));
+ InetSocketAddress outputAddress = InetSocketAddress.createUnresolved(hostPort[0],
+ Integer.valueOf(hostPort[1]));
RSocketForwardingFunction rsocketFunction = new RSocketForwardingFunction(function, outputAddress);
FunctionRegistration functionRegistration = new FunctionRegistration(rsocketFunction, name);
- functionRegistration.type(FunctionTypeUtils.discoverFunctionTypeFromClass(RSocketListenerFunction.class));
+ functionRegistration
+ .type(FunctionTypeUtils.discoverFunctionTypeFromClass(RSocketListenerFunction.class));
((FunctionRegistry) this.functionCatalog).register(functionRegistration);
}
}
@@ -141,15 +172,19 @@ public class RSocketAutoConfiguration {
this.invocableFunction.start();
}
}
+
@Override
public void stop() {
if (this.isRunning() && this.invocableFunction != null) {
this.invocableFunction.stop();
}
}
+
@Override
public boolean isRunning() {
return this.started;
}
+
}
+
}
diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunctionProperties.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunctionProperties.java
index d7848679f..5b2ce3d11 100644
--- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunctionProperties.java
+++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunctionProperties.java
@@ -29,9 +29,19 @@ import org.springframework.cloud.function.context.FunctionProperties;
@ConfigurationProperties(prefix = FunctionProperties.PREFIX + ".rsocket")
public class RSocketFunctionProperties {
- private String bindAddress = "localhost";
+ private boolean enabled;
- private int bindPort = 55555;
+ private String bindAddress;
+
+ private Integer bindPort;
+
+ public boolean isEnabled() {
+ return this.enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
public String getBindAddress() {
return bindAddress;
@@ -41,11 +51,11 @@ public class RSocketFunctionProperties {
this.bindAddress = bindAddress;
}
- public int getBindPort() {
+ public Integer getBindPort() {
return bindPort;
}
- public void setBindPort(int bindPort) {
+ public void setBindPort(Integer bindPort) {
this.bindPort = bindPort;
}
}
diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java
index c55fccc9d..10ea5ee79 100644
--- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java
+++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java
@@ -60,6 +60,7 @@ public class RSocketListenerFunction implements Function, Publis
private final FunctionInvocationWrapper targetFunction;
private Disposable rsocketConnection;
+ private RSocket rsocket;
RSocketListenerFunction(FunctionInvocationWrapper targetFunction, InetSocketAddress listenAddress) {
this.listenAddress = listenAddress;
@@ -80,7 +81,9 @@ public class RSocketListenerFunction implements Function, Publis
void start() {
Type functionType = this.targetFunction.getFunctionType();
- RSocket rsocket = buildRSocket(this.targetFunction.getFunctionDefinition(), functionType, this);
+ if (rsocket == null) {
+ rsocket = buildRSocket(this.targetFunction, functionType, this);
+ }
if (this.listenAddress != null) {
this.rsocketConnection = RSocketConnectionUtils.createServerSocket(rsocket, this.listenAddress);
this.printSplashScreen(this.targetFunction.getFunctionDefinition(), functionType);
@@ -93,7 +96,15 @@ public class RSocketListenerFunction implements Function, Publis
}
}
- private RSocket buildRSocket(String definition, Type functionType, Function, Publisher>> function) {
+ public RSocket getRsocket() {
+ if (this.rsocket == null) {
+ start();
+ }
+ return this.rsocket;
+ }
+
+ private RSocket buildRSocket(FunctionInvocationWrapper targetFunction, Type functionType, Function, Publisher>> function) {
+ String definition = targetFunction.getFunctionDefinition();
RSocket clientRSocket = new RSocket() { // imperative function or Function, Mono> = requestResponse
@Override
public Mono requestResponse(Payload payload) {
diff --git a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RoutingBrokerTests.java b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RoutingBrokerTests.java
new file mode 100644
index 000000000..81ca2272d
--- /dev/null
+++ b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RoutingBrokerTests.java
@@ -0,0 +1,224 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.rsocket;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
+import io.rsocket.Payload;
+import io.rsocket.RSocket;
+import io.rsocket.core.RSocketConnector;
+import io.rsocket.frame.decoder.PayloadDecoder;
+import io.rsocket.metadata.CompositeMetadataCodec;
+import io.rsocket.routing.broker.spring.MimeTypes;
+import io.rsocket.routing.common.Id;
+import io.rsocket.routing.common.Tags;
+import io.rsocket.routing.common.WellKnownKey;
+import io.rsocket.routing.frames.AddressFlyweight;
+import io.rsocket.routing.frames.RouteSetupFlyweight;
+import io.rsocket.transport.netty.client.TcpClientTransport;
+import io.rsocket.util.DefaultPayload;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import org.springframework.boot.WebApplicationType;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.builder.SpringApplicationBuilder;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.messaging.rsocket.RSocketStrategies;
+import org.springframework.util.SocketUtils;
+
+import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
+import static io.rsocket.routing.broker.spring.MimeTypes.COMPOSITE_MIME_TYPE;
+
+
+/**
+ *
+ * @author Oleg Zhurakousky
+ * @since 3.1
+ */
+public class RoutingBrokerTests {
+
+ @Test
+ public void testImperativeFunctionAsRequestReply() throws Exception {
+
+ ConfigurableApplicationContext functionContext = null;
+ ConfigurableApplicationContext brokerContext = null;
+ try {
+ int routingBrokerProxyPort = SocketUtils.findAvailableTcpPort();
+ int routingBrokerClusterPort = SocketUtils.findAvailableTcpPort();
+
+ // start broker
+ brokerContext = new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
+ "--logging.level.io.rsocket.routing.broker=TRACE",
+ "--spring.cloud.function.rsocket.enabled=false",
+ "--io.rsocket.routing.client.enabled=false",
+ "--io.rsocket.routing.broker.enabled=true",
+ "--io.rsocket.routing.broker.tcp.port=" + routingBrokerProxyPort,
+ "--io.rsocket.routing.broker.cluster.port=" + routingBrokerClusterPort);
+
+ // start function connecting to broker, service-name=toupper
+ functionContext = new SpringApplicationBuilder(SampleFunctionConfiguration.class)
+ .web(WebApplicationType.NONE).run(
+ "--logging.level.org.springframework.cloud.function=DEBUG",
+ "--io.rsocket.routing.client.enabled=true",
+ "--io.rsocket.routing.client.service-name=toupper",
+ "--io.rsocket.routing.client.brokers[0].host=localhost",
+ "--io.rsocket.routing.client.brokers[0].port=" + routingBrokerProxyPort,
+ "--io.rsocket.routing.broker.enabled=false",
+ "--spring.cloud.function.definition=uppercase");
+
+ // setup metadata to identify the this test connecting to broker.
+ RSocketStrategies strategies = functionContext.getBean(RSocketStrategies.class);
+ Id testerId = Id.random();
+ ByteBuf routeSetup = encodeRouteSetup(strategies, testerId, "tester");
+ Payload setupPayload = DefaultPayload.create(EMPTY_BUFFER, routeSetup);
+
+ // connect to broker
+ RSocket socket = RSocketConnector.create().payloadDecoder(PayloadDecoder.ZERO_COPY)
+ .metadataMimeType(COMPOSITE_MIME_TYPE.toString())
+ .setupPayload(setupPayload)
+ .connect(TcpClientTransport.create(routingBrokerProxyPort))
+ .block();
+
+ // setup data for request to toupper service
+ ByteBuffer data = StandardCharsets.UTF_8.encode(CharBuffer.wrap("\"hello\""));
+ // setup metadata for request to toupper service
+ ByteBuf routingMetadata = encodeAddress(strategies, testerId, "toupper");
+ Payload payload = DefaultPayload.create(data, routingMetadata.nioBuffer());
+ // call toupper service
+ Mono result = socket.requestResponse(payload).map(Payload::getDataUtf8);
+
+ StepVerifier
+ .create(result)
+ .expectNext("\"HELLO\"")
+ .expectComplete()
+ .verify();
+ } finally {
+ if (functionContext != null) {
+ functionContext.close();
+ }
+ if (brokerContext != null) {
+ brokerContext.close();
+ }
+ }
+ }
+
+ static ByteBuf encodeAddress(RSocketStrategies strategies, Id originRouteId, String serviceName) {
+ Tags tags = Tags.builder().with(WellKnownKey.SERVICE_NAME, serviceName)
+ .buildTags();
+ ByteBuf address = AddressFlyweight
+ .encode(ByteBufAllocator.DEFAULT, originRouteId, Tags.empty(), tags);
+
+ CompositeByteBuf composite = encodeComposite(address, MimeTypes.ROUTING_FRAME_MIME_TYPE
+ .toString());
+ return composite;
+ }
+
+ private static ByteBuf encodeRouteSetup(RSocketStrategies strategies, Id routeId, String serviceName) {
+ Tags tags = Tags.builder()
+ .with("current-time", String.valueOf(System.currentTimeMillis()))
+ .with(WellKnownKey.TIME_ZONE, System.currentTimeMillis() + "")
+ .buildTags();
+ ByteBuf routeSetup = RouteSetupFlyweight
+ .encode(ByteBufAllocator.DEFAULT, routeId, serviceName, tags);
+
+ CompositeByteBuf composite = encodeComposite(routeSetup, MimeTypes.ROUTING_FRAME_MIME_TYPE
+ .toString());
+ return composite;
+ }
+
+ private static CompositeByteBuf encodeComposite(ByteBuf byteBuf, String mimeType) {
+ CompositeByteBuf composite = ByteBufAllocator.DEFAULT.compositeBuffer();
+ CompositeMetadataCodec
+ .encodeAndAddMetadata(composite, ByteBufAllocator.DEFAULT,
+ mimeType, byteBuf);
+ return composite;
+ }
+
+ @EnableAutoConfiguration
+ @Configuration
+ public static class RoutingBrokerConfiguration {
+
+ }
+
+ @EnableAutoConfiguration
+ @Configuration
+ public static class SampleFunctionConfiguration {
+ @Bean
+ public Function uppercase() {
+ return v -> {
+ return v.toUpperCase();
+ };
+ }
+
+ @Bean
+ public Function concat() {
+ return v -> {
+ return v + v;
+ };
+ }
+
+ @Bean
+ public Function echo() {
+ return v -> v;
+ }
+
+ @Bean
+ public Function, Flux> uppercaseReactive() {
+ return flux -> flux.map(v -> {
+ System.out.println("Uppercasing: " + v);
+ return v.toUpperCase();
+ });
+ }
+
+ @Bean
+ public Consumer log() {
+ return v -> {
+ System.out.println("==> In Consumer: " + new String(v));
+ };
+ }
+ }
+
+ @EnableAutoConfiguration
+ @Configuration
+ public static class AdditionalFunctionConfiguration {
+ @Bean
+ public Function reverse() {
+ return v -> {
+ return new StringBuilder(v).reverse().toString();
+ };
+ }
+
+ @Bean
+ public Function wrap() {
+ return v -> {
+ return "(" + v + ")";
+ };
+ }
+ }
+}
diff --git a/spring-cloud-function-rsocket/src/test/resources/application.properties b/spring-cloud-function-rsocket/src/test/resources/application.properties
new file mode 100644
index 000000000..7a1ee9c3e
--- /dev/null
+++ b/spring-cloud-function-rsocket/src/test/resources/application.properties
@@ -0,0 +1,2 @@
+io.rsocket.routing.broker.enabled=false
+io.rsocket.routing.client.enabled=false