From c91e2a00e4c45535a903469828029c32fa8723de Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 29 Jun 2020 14:44:29 +0200 Subject: [PATCH] Initial Committ of RSocket integration --- pom.xml | 1 + spring-cloud-function-rsocket/.jdk8 | 0 spring-cloud-function-rsocket/NOTES.txt | 7 + spring-cloud-function-rsocket/pom.xml | 64 +++++++ .../rsocket/RSocketAutoConfiguration.java | 165 ++++++++++++++++++ .../function/rsocket/RSocketFunction.java | 86 +++++++++ .../rsocket/RSocketFunctionProperties.java | 71 ++++++++ ...itional-spring-configuration-metadata.json | 11 ++ .../main/resources/META-INF/spring.factories | 1 + .../RSocketAutoConfigurationTests.java | 127 ++++++++++++++ 10 files changed, 533 insertions(+) create mode 100644 spring-cloud-function-rsocket/.jdk8 create mode 100644 spring-cloud-function-rsocket/NOTES.txt create mode 100644 spring-cloud-function-rsocket/pom.xml create mode 100644 spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java create mode 100644 spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunction.java create mode 100644 spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunctionProperties.java create mode 100644 spring-cloud-function-rsocket/src/main/resources/META-INF/additional-spring-configuration-metadata.json create mode 100644 spring-cloud-function-rsocket/src/main/resources/META-INF/spring.factories create mode 100644 spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java diff --git a/pom.xml b/pom.xml index afbf29902..86f9bfd9a 100644 --- a/pom.xml +++ b/pom.xml @@ -60,6 +60,7 @@ spring-cloud-function-deployer spring-cloud-function-adapters spring-cloud-function-kotlin + spring-cloud-function-rsocket docs diff --git a/spring-cloud-function-rsocket/.jdk8 b/spring-cloud-function-rsocket/.jdk8 new file mode 100644 index 000000000..e69de29bb diff --git a/spring-cloud-function-rsocket/NOTES.txt b/spring-cloud-function-rsocket/NOTES.txt new file mode 100644 index 000000000..60f6788f7 --- /dev/null +++ b/spring-cloud-function-rsocket/NOTES.txt @@ -0,0 +1,7 @@ + +spring.cloud.function.rsocket.bind-address=localhost +spring.cloud.function.rsocket.bind-port=1234 + +spring.cloud.function.rsocket.target-address=localhost +spring.cloud.function.rsocket.target-port=1235 + diff --git a/spring-cloud-function-rsocket/pom.xml b/spring-cloud-function-rsocket/pom.xml new file mode 100644 index 000000000..7b6a57766 --- /dev/null +++ b/spring-cloud-function-rsocket/pom.xml @@ -0,0 +1,64 @@ + + + 4.0.0 + + spring-cloud-function-rsocket + jar + Spring Cloud Function RSocket Support + Spring Cloud Function RSocket Support + + + org.springframework.cloud + spring-cloud-function-parent + 3.1.0-SNAPSHOT + + + + + + io.rsocket + rsocket-core + + + io.rsocket + rsocket-transport-netty + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.cloud + spring-cloud-function-context + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.boot + spring-boot-configuration-processor + true + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.springframework.boot.experimental + spring-boot-thin-layout + ${wrapper.version} + + + + + + + diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java new file mode 100644 index 000000000..dda64e476 --- /dev/null +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java @@ -0,0 +1,165 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.rsocket; + +import java.lang.reflect.Type; +import java.nio.ByteBuffer; +import java.util.function.Function; + +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketServer; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.DefaultPayload; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import reactor.core.publisher.Mono; + +import org.springframework.beans.factory.InitializingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.function.context.FunctionCatalog; +import org.springframework.cloud.function.context.FunctionProperties; +import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; +import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * Main configuration class for components required to support RSocket integration with spring-cloud-function. + * + * @author Oleg Zhurakousky + * @since 3.1 + */ +@Configuration(proxyBeanMethods = false) +@EnableConfigurationProperties({FunctionProperties.class, RSocketFunctionProperties.class}) +public class RSocketAutoConfiguration { + + private static Log logger = LogFactory.getLog(RSocketAutoConfiguration.class); + + @Bean + public FunctionToDestinationBinder functionToDestinationBinder(FunctionCatalog functionCatalog, + FunctionProperties functionProperties, RSocketFunctionProperties rSocketFunctionProperties) { + return new FunctionToDestinationBinder(functionCatalog, functionProperties, rSocketFunctionProperties); + } + + + @SuppressWarnings("rawtypes") + private static RSocket buildRSocket(String definition, Type functionType, Function function) { + RSocket clientRSocket = null; +// if (isFireAndForget(functionType)) { // fire-and-forget +// RSocket rsocket = new RSocket() { // imperative function or Function = requestResponse +// @Override +// public Mono fireAndForget(Payload p) { +// System.out.println("Invoking fireAndForget"); +// invokeFunction(p, function); +// return Mono.empty(); +// } +// }; +// return rsocket; +// } + if (isRequestRepply(functionType)) { + if (logger.isDebugEnabled()) { + logger.debug("Mapping function '" + definition + "' as RSocket `requestResponse`."); + } + + clientRSocket = new RSocket() { // imperative function or Function = requestResponse + @SuppressWarnings("unchecked") + @Override + public Mono requestResponse(Payload payload) { + Object result = invokeFunction(payload, function); + Mono invocationResult = ((Mono>) result).map(message -> DefaultPayload.create(message.getPayload())); + return invocationResult; + } + }; + } + else { + throw new UnsupportedOperationException("Only RSocket 'requestResponse' is currently supported"); + } + Assert.notNull(clientRSocket, "Failed to create RSocket for function '" + definition + "'"); + return clientRSocket; + } + + @SuppressWarnings("unchecked") + private static Mono> invokeFunction(Payload p, Function, Mono>> function) { + ByteBuffer buffer = p.getData(); + byte[] rawData = new byte[buffer.remaining()]; + buffer.get(rawData); + if (p.hasMetadata()) { + String metadata = p.getMetadataUtf8(); // TODO see what to do with it + } + Message inputMessage = MessageBuilder.withPayload(rawData).build(); + Object result = function.apply(inputMessage); + return result instanceof Mono ? (Mono>) result : Mono.just((Message) result); + } + + private static boolean isFireAndForget(Type functionType) { + Type inputType = FunctionTypeUtils.getInputType(functionType, 0); + return FunctionTypeUtils.isConsumer(functionType) && !FunctionTypeUtils.isPublisher(inputType); + } + + private static boolean isRequestRepply(Type functionType) { + Type inputType = FunctionTypeUtils.getInputType(functionType, 0); + Type outputType = FunctionTypeUtils.getOutputType(functionType, 0); + return !FunctionTypeUtils.isPublisher(inputType) && (!FunctionTypeUtils.isPublisher(outputType) || FunctionTypeUtils.isMono(outputType)); + } + + /** + * + */ + private static class FunctionToDestinationBinder implements InitializingBean { + + private final FunctionCatalog functionCatalog; + + private final FunctionProperties functionProperties; + + private final RSocketFunctionProperties rSocketFunctionProperties; + + FunctionToDestinationBinder(FunctionCatalog functionCatalog, FunctionProperties functionProperties, + RSocketFunctionProperties rSocketFunctionProperties) { + this.functionCatalog = functionCatalog; + this.functionProperties = functionProperties; + this.rSocketFunctionProperties = rSocketFunctionProperties; + } + + @SuppressWarnings("rawtypes") + @Override + public void afterPropertiesSet() throws Exception { + String definition = this.functionProperties.getDefinition(); + //TODO externalize content-type + FunctionInvocationWrapper function = functionCatalog.lookup(definition, "application/json"); + if (function.isSupplier()) { + throw new UnsupportedOperationException("Supplier is not currently supported for RSocket interaction"); + } + + Function invocableFunction = StringUtils.hasText(this.rSocketFunctionProperties.getTargetAddress()) + ? new RSocketFunction(this.rSocketFunctionProperties.getTargetAddress(), this.rSocketFunctionProperties.getTargetPort(), function) + : function; + + Type functionType = function.getFunctionType(); + RSocket rsocket = buildRSocket(definition, functionType, invocableFunction); + RSocketServer.create(SocketAcceptor.with(rsocket)) + .bind(TcpServerTransport.create(this.rSocketFunctionProperties.getBindAddress(), this.rSocketFunctionProperties.getBindPort())) + .subscribe(); // TODO do we need to close/dispose etc? + } + + } +} diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunction.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunction.java new file mode 100644 index 000000000..417391670 --- /dev/null +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunction.java @@ -0,0 +1,86 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.rsocket; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Function; + +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.core.RSocketConnector; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.util.DefaultPayload; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +/** + * Wrapper over an instance of target Function (represented by {@link FunctionInvocationWrapper}) + * which will use the result of the invocation of such function as an input to another RSocket + * effectively composing two functions over RSocket. + * + * @author Oleg Zhurakousky + * @since 3.1 + */ +class RSocketFunction implements Function, Mono>> { + + private final String bindAddress; + + private final int port; + + private final FunctionInvocationWrapper function; + + private final RSocket rSocket; + + RSocketFunction(String bindAddress, int port, FunctionInvocationWrapper function) { + this.bindAddress = bindAddress; + this.port = port; + this.function = function; + this.rSocket = RSocketConnector.connectWith(TcpClientTransport.create(this.bindAddress, this.port)) + .log() + .retryWhen(Retry.backoff(5, Duration.ofSeconds(1))) + .block(); + } + + @SuppressWarnings("unchecked") + @Override + public Mono> apply(Message input) { + Message result = (Message) function.apply(input); + Mono> resultMessage = null; + if (result != null) { + resultMessage = this.rSocket + .requestResponse(DefaultPayload.create(result.getPayload())) + .map(this::buildResultMessage); + } + return resultMessage; + } + + private Message buildResultMessage(Payload payload) { + ByteBuffer payloadBuffer = payload.getData(); + byte[] payloadData = new byte[payloadBuffer.remaining()]; + payloadBuffer.get(payloadData); + +// ByteBuffer headersBuffer = responsePayload.getMetadata(); +// byte[] rawData = new byte[payloadBuffer.remaining()]; +// payloadBuffer.get(rawData); + return MessageBuilder.withPayload(payloadData).build(); + } +} diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunctionProperties.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunctionProperties.java new file mode 100644 index 000000000..228c271e3 --- /dev/null +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunctionProperties.java @@ -0,0 +1,71 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.rsocket; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.cloud.function.context.FunctionProperties; + +/** + * Main configuration properties for RSocket integration with spring-cloud-function. + * The prefix for these properties is `spring.cloud.function.rscocket`. + * + * @author Oleg Zhurakousky + * @since 3.1 + */ +@ConfigurationProperties(prefix = FunctionProperties.PREFIX + ".rsocket") +public class RSocketFunctionProperties { + + private String bindAddress; + + private int bindPort; + + private String targetAddress; + + private int targetPort; + + public String getBindAddress() { + return bindAddress; + } + + public void setBindAddress(String bindAddress) { + this.bindAddress = bindAddress; + } + + public int getBindPort() { + return bindPort; + } + + public void setBindPort(int bindPort) { + this.bindPort = bindPort; + } + + public String getTargetAddress() { + return targetAddress; + } + + public void setTargetAddress(String targetAddress) { + this.targetAddress = targetAddress; + } + + public int getTargetPort() { + return targetPort; + } + + public void setTargetPort(int targetPort) { + this.targetPort = targetPort; + } +} diff --git a/spring-cloud-function-rsocket/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-cloud-function-rsocket/src/main/resources/META-INF/additional-spring-configuration-metadata.json new file mode 100644 index 000000000..217d1affe --- /dev/null +++ b/spring-cloud-function-rsocket/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -0,0 +1,11 @@ +{ + "properties": [ + { + "name": "spring.cloud.function.web.path", + "type": "java.lang.String", + "description": "Path to web resources for functions (should start with / if not empty).", + "defaultValue": "" + } + ] +} + diff --git a/spring-cloud-function-rsocket/src/main/resources/META-INF/spring.factories b/spring-cloud-function-rsocket/src/main/resources/META-INF/spring.factories new file mode 100644 index 000000000..a1476973a --- /dev/null +++ b/spring-cloud-function-rsocket/src/main/resources/META-INF/spring.factories @@ -0,0 +1 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.springframework.cloud.function.rsocket.RSocketAutoConfiguration diff --git a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java new file mode 100644 index 000000000..97278aacd --- /dev/null +++ b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java @@ -0,0 +1,127 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.rsocket; + +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Function; + +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.core.RSocketConnector; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.util.DefaultPayload; +import org.junit.jupiter.api.Test; +import reactor.util.retry.Retry; + +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * + * @author Oleg Zhurakousky + * @since 3.1 + */ +public class RSocketAutoConfigurationTests { + + @Test + public void testRequestReplyFunction() throws Exception { + new SpringApplicationBuilder(SampleFunctionConfiguration.class).run( + "--logging.level.org.springframework.cloud.function=DEBUG", + "--spring.cloud.function.definition=uppercase", + "--spring.cloud.function.rsocket.bind-address=localhost", + "--spring.cloud.function.rsocket.bind-port=12345"); + + RSocket socket = RSocketConnector.connectWith(TcpClientTransport.create("localhost", 12345)).log() + .retryWhen(Retry.backoff(5, Duration.ofSeconds(1))).block(); + String result = socket.requestResponse(DefaultPayload.create("\"hello\"")).map(Payload::getDataUtf8).block(); + + assertThat(result).isEqualTo("\"HELLO\""); + } + + @Test + public void testRequestReplyFunctionWithComposition() throws Exception { + new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run( + "--logging.level.org.springframework.cloud.function=DEBUG", + "--spring.cloud.function.definition=uppercase", + "--spring.cloud.function.rsocket.bind-address=localhost", + "--spring.cloud.function.rsocket.bind-port=12345"); + + new SpringApplicationBuilder(AdditionalFunctionConfiguration.class).web(WebApplicationType.NONE).run( + "--logging.level.org.springframework.cloud.function=DEBUG", + "--spring.cloud.function.definition=reverse", "--spring.cloud.function.rsocket.bind-address=localhost", + "--spring.cloud.function.rsocket.bind-port=12346", + "--spring.cloud.function.rsocket.target-address=localhost", + "--spring.cloud.function.rsocket.target-port=12345"); + + RSocket socket = RSocketConnector.connectWith(TcpClientTransport.create("localhost", 12346)).log() + .retryWhen(Retry.backoff(5, Duration.ofSeconds(1))).block(); + String result = socket.requestResponse(DefaultPayload.create("\"hello\"")).map(Payload::getDataUtf8).block(); + + assertThat(result).isEqualTo("\"OLLEH\""); + } + +// @Test +// public void testFireAndForgetConsumer() throws Exception { +// new SpringApplicationBuilder(SampleFunctionConfiguration.class) +// .run("--logging.level.org.springframework.cloud.function=DEBUG", +// "--spring.cloud.function.definition=log"); +// +// RSocket socket = RSocketConnector.connectWith(TcpClientTransport.create("localhost", 7000)) +// .log() +// .retryWhen(Retry.backoff(5, Duration.ofSeconds(1))) +// .block(); +// socket.fireAndForget(DefaultPayload.create("Hello")) +// .log() +// .onErrorContinue((e, x) -> { +// System.out.println(e); +// }) +// .block(); +// Thread.sleep(2000); +// System.out.println(); +// } + + @EnableAutoConfiguration + @Configuration + public static class SampleFunctionConfiguration { + @Bean + public Function uppercase() { + return v -> v.toUpperCase(); + } + + @Bean + public Consumer log() { + return v -> { + System.out.println("==> In Consumer: " + new String(v)); + }; + } + } + + @EnableAutoConfiguration + @Configuration + public static class AdditionalFunctionConfiguration { + @Bean + public Function reverse() { + return v -> new StringBuilder(v).reverse().toString(); + } + } +}