diff --git a/pom.xml b/pom.xml
index afbf29902..86f9bfd9a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,6 +60,7 @@
spring-cloud-function-deployer
spring-cloud-function-adapters
spring-cloud-function-kotlin
+ spring-cloud-function-rsocket
docs
diff --git a/spring-cloud-function-rsocket/.jdk8 b/spring-cloud-function-rsocket/.jdk8
new file mode 100644
index 000000000..e69de29bb
diff --git a/spring-cloud-function-rsocket/NOTES.txt b/spring-cloud-function-rsocket/NOTES.txt
new file mode 100644
index 000000000..60f6788f7
--- /dev/null
+++ b/spring-cloud-function-rsocket/NOTES.txt
@@ -0,0 +1,7 @@
+
+spring.cloud.function.rsocket.bind-address=localhost
+spring.cloud.function.rsocket.bind-port=1234
+
+spring.cloud.function.rsocket.target-address=localhost
+spring.cloud.function.rsocket.target-port=1235
+
diff --git a/spring-cloud-function-rsocket/pom.xml b/spring-cloud-function-rsocket/pom.xml
new file mode 100644
index 000000000..7b6a57766
--- /dev/null
+++ b/spring-cloud-function-rsocket/pom.xml
@@ -0,0 +1,64 @@
+
+
+ 4.0.0
+
+ spring-cloud-function-rsocket
+ jar
+ Spring Cloud Function RSocket Support
+ Spring Cloud Function RSocket Support
+
+
+ org.springframework.cloud
+ spring-cloud-function-parent
+ 3.1.0-SNAPSHOT
+
+
+
+
+
+ io.rsocket
+ rsocket-core
+
+
+ io.rsocket
+ rsocket-transport-netty
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.springframework.cloud
+ spring-cloud-function-context
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.springframework.boot
+ spring-boot-configuration-processor
+ true
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+ org.springframework.boot.experimental
+ spring-boot-thin-layout
+ ${wrapper.version}
+
+
+
+
+
+
+
diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java
new file mode 100644
index 000000000..dda64e476
--- /dev/null
+++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.rsocket;
+
+import java.lang.reflect.Type;
+import java.nio.ByteBuffer;
+import java.util.function.Function;
+
+import io.rsocket.Payload;
+import io.rsocket.RSocket;
+import io.rsocket.SocketAcceptor;
+import io.rsocket.core.RSocketServer;
+import io.rsocket.transport.netty.server.TcpServerTransport;
+import io.rsocket.util.DefaultPayload;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import reactor.core.publisher.Mono;
+
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.cloud.function.context.FunctionCatalog;
+import org.springframework.cloud.function.context.FunctionProperties;
+import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
+import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.util.Assert;
+import org.springframework.util.StringUtils;
+
+/**
+ * Main configuration class for components required to support RSocket integration with spring-cloud-function.
+ *
+ * @author Oleg Zhurakousky
+ * @since 3.1
+ */
+@Configuration(proxyBeanMethods = false)
+@EnableConfigurationProperties({FunctionProperties.class, RSocketFunctionProperties.class})
+public class RSocketAutoConfiguration {
+
+ private static Log logger = LogFactory.getLog(RSocketAutoConfiguration.class);
+
+ @Bean
+ public FunctionToDestinationBinder functionToDestinationBinder(FunctionCatalog functionCatalog,
+ FunctionProperties functionProperties, RSocketFunctionProperties rSocketFunctionProperties) {
+ return new FunctionToDestinationBinder(functionCatalog, functionProperties, rSocketFunctionProperties);
+ }
+
+
+ @SuppressWarnings("rawtypes")
+ private static RSocket buildRSocket(String definition, Type functionType, Function function) {
+ RSocket clientRSocket = null;
+// if (isFireAndForget(functionType)) { // fire-and-forget
+// RSocket rsocket = new RSocket() { // imperative function or Function, Mono> = requestResponse
+// @Override
+// public Mono fireAndForget(Payload p) {
+// System.out.println("Invoking fireAndForget");
+// invokeFunction(p, function);
+// return Mono.empty();
+// }
+// };
+// return rsocket;
+// }
+ if (isRequestRepply(functionType)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Mapping function '" + definition + "' as RSocket `requestResponse`.");
+ }
+
+ clientRSocket = new RSocket() { // imperative function or Function, Mono> = requestResponse
+ @SuppressWarnings("unchecked")
+ @Override
+ public Mono requestResponse(Payload payload) {
+ Object result = invokeFunction(payload, function);
+ Mono invocationResult = ((Mono>) result).map(message -> DefaultPayload.create(message.getPayload()));
+ return invocationResult;
+ }
+ };
+ }
+ else {
+ throw new UnsupportedOperationException("Only RSocket 'requestResponse' is currently supported");
+ }
+ Assert.notNull(clientRSocket, "Failed to create RSocket for function '" + definition + "'");
+ return clientRSocket;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Mono> invokeFunction(Payload p, Function, Mono>> function) {
+ ByteBuffer buffer = p.getData();
+ byte[] rawData = new byte[buffer.remaining()];
+ buffer.get(rawData);
+ if (p.hasMetadata()) {
+ String metadata = p.getMetadataUtf8(); // TODO see what to do with it
+ }
+ Message inputMessage = MessageBuilder.withPayload(rawData).build();
+ Object result = function.apply(inputMessage);
+ return result instanceof Mono ? (Mono>) result : Mono.just((Message) result);
+ }
+
+ private static boolean isFireAndForget(Type functionType) {
+ Type inputType = FunctionTypeUtils.getInputType(functionType, 0);
+ return FunctionTypeUtils.isConsumer(functionType) && !FunctionTypeUtils.isPublisher(inputType);
+ }
+
+ private static boolean isRequestRepply(Type functionType) {
+ Type inputType = FunctionTypeUtils.getInputType(functionType, 0);
+ Type outputType = FunctionTypeUtils.getOutputType(functionType, 0);
+ return !FunctionTypeUtils.isPublisher(inputType) && (!FunctionTypeUtils.isPublisher(outputType) || FunctionTypeUtils.isMono(outputType));
+ }
+
+ /**
+ *
+ */
+ private static class FunctionToDestinationBinder implements InitializingBean {
+
+ private final FunctionCatalog functionCatalog;
+
+ private final FunctionProperties functionProperties;
+
+ private final RSocketFunctionProperties rSocketFunctionProperties;
+
+ FunctionToDestinationBinder(FunctionCatalog functionCatalog, FunctionProperties functionProperties,
+ RSocketFunctionProperties rSocketFunctionProperties) {
+ this.functionCatalog = functionCatalog;
+ this.functionProperties = functionProperties;
+ this.rSocketFunctionProperties = rSocketFunctionProperties;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ String definition = this.functionProperties.getDefinition();
+ //TODO externalize content-type
+ FunctionInvocationWrapper function = functionCatalog.lookup(definition, "application/json");
+ if (function.isSupplier()) {
+ throw new UnsupportedOperationException("Supplier is not currently supported for RSocket interaction");
+ }
+
+ Function invocableFunction = StringUtils.hasText(this.rSocketFunctionProperties.getTargetAddress())
+ ? new RSocketFunction(this.rSocketFunctionProperties.getTargetAddress(), this.rSocketFunctionProperties.getTargetPort(), function)
+ : function;
+
+ Type functionType = function.getFunctionType();
+ RSocket rsocket = buildRSocket(definition, functionType, invocableFunction);
+ RSocketServer.create(SocketAcceptor.with(rsocket))
+ .bind(TcpServerTransport.create(this.rSocketFunctionProperties.getBindAddress(), this.rSocketFunctionProperties.getBindPort()))
+ .subscribe(); // TODO do we need to close/dispose etc?
+ }
+
+ }
+}
diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunction.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunction.java
new file mode 100644
index 000000000..417391670
--- /dev/null
+++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunction.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.rsocket;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Function;
+
+import io.rsocket.Payload;
+import io.rsocket.RSocket;
+import io.rsocket.core.RSocketConnector;
+import io.rsocket.transport.netty.client.TcpClientTransport;
+import io.rsocket.util.DefaultPayload;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+
+/**
+ * Wrapper over an instance of target Function (represented by {@link FunctionInvocationWrapper})
+ * which will use the result of the invocation of such function as an input to another RSocket
+ * effectively composing two functions over RSocket.
+ *
+ * @author Oleg Zhurakousky
+ * @since 3.1
+ */
+class RSocketFunction implements Function, Mono>> {
+
+ private final String bindAddress;
+
+ private final int port;
+
+ private final FunctionInvocationWrapper function;
+
+ private final RSocket rSocket;
+
+ RSocketFunction(String bindAddress, int port, FunctionInvocationWrapper function) {
+ this.bindAddress = bindAddress;
+ this.port = port;
+ this.function = function;
+ this.rSocket = RSocketConnector.connectWith(TcpClientTransport.create(this.bindAddress, this.port))
+ .log()
+ .retryWhen(Retry.backoff(5, Duration.ofSeconds(1)))
+ .block();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Mono> apply(Message input) {
+ Message result = (Message) function.apply(input);
+ Mono> resultMessage = null;
+ if (result != null) {
+ resultMessage = this.rSocket
+ .requestResponse(DefaultPayload.create(result.getPayload()))
+ .map(this::buildResultMessage);
+ }
+ return resultMessage;
+ }
+
+ private Message buildResultMessage(Payload payload) {
+ ByteBuffer payloadBuffer = payload.getData();
+ byte[] payloadData = new byte[payloadBuffer.remaining()];
+ payloadBuffer.get(payloadData);
+
+// ByteBuffer headersBuffer = responsePayload.getMetadata();
+// byte[] rawData = new byte[payloadBuffer.remaining()];
+// payloadBuffer.get(rawData);
+ return MessageBuilder.withPayload(payloadData).build();
+ }
+}
diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunctionProperties.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunctionProperties.java
new file mode 100644
index 000000000..228c271e3
--- /dev/null
+++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunctionProperties.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.rsocket;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.cloud.function.context.FunctionProperties;
+
+/**
+ * Main configuration properties for RSocket integration with spring-cloud-function.
+ * The prefix for these properties is `spring.cloud.function.rscocket`.
+ *
+ * @author Oleg Zhurakousky
+ * @since 3.1
+ */
+@ConfigurationProperties(prefix = FunctionProperties.PREFIX + ".rsocket")
+public class RSocketFunctionProperties {
+
+ private String bindAddress;
+
+ private int bindPort;
+
+ private String targetAddress;
+
+ private int targetPort;
+
+ public String getBindAddress() {
+ return bindAddress;
+ }
+
+ public void setBindAddress(String bindAddress) {
+ this.bindAddress = bindAddress;
+ }
+
+ public int getBindPort() {
+ return bindPort;
+ }
+
+ public void setBindPort(int bindPort) {
+ this.bindPort = bindPort;
+ }
+
+ public String getTargetAddress() {
+ return targetAddress;
+ }
+
+ public void setTargetAddress(String targetAddress) {
+ this.targetAddress = targetAddress;
+ }
+
+ public int getTargetPort() {
+ return targetPort;
+ }
+
+ public void setTargetPort(int targetPort) {
+ this.targetPort = targetPort;
+ }
+}
diff --git a/spring-cloud-function-rsocket/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-cloud-function-rsocket/src/main/resources/META-INF/additional-spring-configuration-metadata.json
new file mode 100644
index 000000000..217d1affe
--- /dev/null
+++ b/spring-cloud-function-rsocket/src/main/resources/META-INF/additional-spring-configuration-metadata.json
@@ -0,0 +1,11 @@
+{
+ "properties": [
+ {
+ "name": "spring.cloud.function.web.path",
+ "type": "java.lang.String",
+ "description": "Path to web resources for functions (should start with / if not empty).",
+ "defaultValue": ""
+ }
+ ]
+}
+
diff --git a/spring-cloud-function-rsocket/src/main/resources/META-INF/spring.factories b/spring-cloud-function-rsocket/src/main/resources/META-INF/spring.factories
new file mode 100644
index 000000000..a1476973a
--- /dev/null
+++ b/spring-cloud-function-rsocket/src/main/resources/META-INF/spring.factories
@@ -0,0 +1 @@
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.springframework.cloud.function.rsocket.RSocketAutoConfiguration
diff --git a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java
new file mode 100644
index 000000000..97278aacd
--- /dev/null
+++ b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.rsocket;
+
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import io.rsocket.Payload;
+import io.rsocket.RSocket;
+import io.rsocket.core.RSocketConnector;
+import io.rsocket.transport.netty.client.TcpClientTransport;
+import io.rsocket.util.DefaultPayload;
+import org.junit.jupiter.api.Test;
+import reactor.util.retry.Retry;
+
+import org.springframework.boot.WebApplicationType;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.builder.SpringApplicationBuilder;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ *
+ * @author Oleg Zhurakousky
+ * @since 3.1
+ */
+public class RSocketAutoConfigurationTests {
+
+ @Test
+ public void testRequestReplyFunction() throws Exception {
+ new SpringApplicationBuilder(SampleFunctionConfiguration.class).run(
+ "--logging.level.org.springframework.cloud.function=DEBUG",
+ "--spring.cloud.function.definition=uppercase",
+ "--spring.cloud.function.rsocket.bind-address=localhost",
+ "--spring.cloud.function.rsocket.bind-port=12345");
+
+ RSocket socket = RSocketConnector.connectWith(TcpClientTransport.create("localhost", 12345)).log()
+ .retryWhen(Retry.backoff(5, Duration.ofSeconds(1))).block();
+ String result = socket.requestResponse(DefaultPayload.create("\"hello\"")).map(Payload::getDataUtf8).block();
+
+ assertThat(result).isEqualTo("\"HELLO\"");
+ }
+
+ @Test
+ public void testRequestReplyFunctionWithComposition() throws Exception {
+ new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
+ "--logging.level.org.springframework.cloud.function=DEBUG",
+ "--spring.cloud.function.definition=uppercase",
+ "--spring.cloud.function.rsocket.bind-address=localhost",
+ "--spring.cloud.function.rsocket.bind-port=12345");
+
+ new SpringApplicationBuilder(AdditionalFunctionConfiguration.class).web(WebApplicationType.NONE).run(
+ "--logging.level.org.springframework.cloud.function=DEBUG",
+ "--spring.cloud.function.definition=reverse", "--spring.cloud.function.rsocket.bind-address=localhost",
+ "--spring.cloud.function.rsocket.bind-port=12346",
+ "--spring.cloud.function.rsocket.target-address=localhost",
+ "--spring.cloud.function.rsocket.target-port=12345");
+
+ RSocket socket = RSocketConnector.connectWith(TcpClientTransport.create("localhost", 12346)).log()
+ .retryWhen(Retry.backoff(5, Duration.ofSeconds(1))).block();
+ String result = socket.requestResponse(DefaultPayload.create("\"hello\"")).map(Payload::getDataUtf8).block();
+
+ assertThat(result).isEqualTo("\"OLLEH\"");
+ }
+
+// @Test
+// public void testFireAndForgetConsumer() throws Exception {
+// new SpringApplicationBuilder(SampleFunctionConfiguration.class)
+// .run("--logging.level.org.springframework.cloud.function=DEBUG",
+// "--spring.cloud.function.definition=log");
+//
+// RSocket socket = RSocketConnector.connectWith(TcpClientTransport.create("localhost", 7000))
+// .log()
+// .retryWhen(Retry.backoff(5, Duration.ofSeconds(1)))
+// .block();
+// socket.fireAndForget(DefaultPayload.create("Hello"))
+// .log()
+// .onErrorContinue((e, x) -> {
+// System.out.println(e);
+// })
+// .block();
+// Thread.sleep(2000);
+// System.out.println();
+// }
+
+ @EnableAutoConfiguration
+ @Configuration
+ public static class SampleFunctionConfiguration {
+ @Bean
+ public Function uppercase() {
+ return v -> v.toUpperCase();
+ }
+
+ @Bean
+ public Consumer log() {
+ return v -> {
+ System.out.println("==> In Consumer: " + new String(v));
+ };
+ }
+ }
+
+ @EnableAutoConfiguration
+ @Configuration
+ public static class AdditionalFunctionConfiguration {
+ @Bean
+ public Function reverse() {
+ return v -> new StringBuilder(v).reverse().toString();
+ }
+ }
+}