Initial Committ of RSocket integration

This commit is contained in:
Oleg Zhurakousky
2020-06-29 14:44:29 +02:00
parent cefe52365b
commit c91e2a00e4
10 changed files with 533 additions and 0 deletions

View File

@@ -0,0 +1,165 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.rsocket;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.function.Function;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketServer;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* Main configuration class for components required to support RSocket integration with spring-cloud-function.
*
* @author Oleg Zhurakousky
* @since 3.1
*/
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties({FunctionProperties.class, RSocketFunctionProperties.class})
public class RSocketAutoConfiguration {
private static Log logger = LogFactory.getLog(RSocketAutoConfiguration.class);
@Bean
public FunctionToDestinationBinder functionToDestinationBinder(FunctionCatalog functionCatalog,
FunctionProperties functionProperties, RSocketFunctionProperties rSocketFunctionProperties) {
return new FunctionToDestinationBinder(functionCatalog, functionProperties, rSocketFunctionProperties);
}
@SuppressWarnings("rawtypes")
private static RSocket buildRSocket(String definition, Type functionType, Function function) {
RSocket clientRSocket = null;
// if (isFireAndForget(functionType)) { // fire-and-forget
// RSocket rsocket = new RSocket() { // imperative function or Function<?, Mono> = requestResponse
// @Override
// public Mono<Void> fireAndForget(Payload p) {
// System.out.println("Invoking fireAndForget");
// invokeFunction(p, function);
// return Mono.empty();
// }
// };
// return rsocket;
// }
if (isRequestRepply(functionType)) {
if (logger.isDebugEnabled()) {
logger.debug("Mapping function '" + definition + "' as RSocket `requestResponse`.");
}
clientRSocket = new RSocket() { // imperative function or Function<?, Mono> = requestResponse
@SuppressWarnings("unchecked")
@Override
public Mono<Payload> requestResponse(Payload payload) {
Object result = invokeFunction(payload, function);
Mono<Payload> invocationResult = ((Mono<Message<byte[]>>) result).map(message -> DefaultPayload.create(message.getPayload()));
return invocationResult;
}
};
}
else {
throw new UnsupportedOperationException("Only RSocket 'requestResponse' is currently supported");
}
Assert.notNull(clientRSocket, "Failed to create RSocket for function '" + definition + "'");
return clientRSocket;
}
@SuppressWarnings("unchecked")
private static Mono<Message<byte[]>> invokeFunction(Payload p, Function<Message<byte[]>, Mono<Message<byte[]>>> function) {
ByteBuffer buffer = p.getData();
byte[] rawData = new byte[buffer.remaining()];
buffer.get(rawData);
if (p.hasMetadata()) {
String metadata = p.getMetadataUtf8(); // TODO see what to do with it
}
Message<byte[]> inputMessage = MessageBuilder.withPayload(rawData).build();
Object result = function.apply(inputMessage);
return result instanceof Mono ? (Mono<Message<byte[]>>) result : Mono.just((Message<byte[]>) result);
}
private static boolean isFireAndForget(Type functionType) {
Type inputType = FunctionTypeUtils.getInputType(functionType, 0);
return FunctionTypeUtils.isConsumer(functionType) && !FunctionTypeUtils.isPublisher(inputType);
}
private static boolean isRequestRepply(Type functionType) {
Type inputType = FunctionTypeUtils.getInputType(functionType, 0);
Type outputType = FunctionTypeUtils.getOutputType(functionType, 0);
return !FunctionTypeUtils.isPublisher(inputType) && (!FunctionTypeUtils.isPublisher(outputType) || FunctionTypeUtils.isMono(outputType));
}
/**
*
*/
private static class FunctionToDestinationBinder implements InitializingBean {
private final FunctionCatalog functionCatalog;
private final FunctionProperties functionProperties;
private final RSocketFunctionProperties rSocketFunctionProperties;
FunctionToDestinationBinder(FunctionCatalog functionCatalog, FunctionProperties functionProperties,
RSocketFunctionProperties rSocketFunctionProperties) {
this.functionCatalog = functionCatalog;
this.functionProperties = functionProperties;
this.rSocketFunctionProperties = rSocketFunctionProperties;
}
@SuppressWarnings("rawtypes")
@Override
public void afterPropertiesSet() throws Exception {
String definition = this.functionProperties.getDefinition();
//TODO externalize content-type
FunctionInvocationWrapper function = functionCatalog.lookup(definition, "application/json");
if (function.isSupplier()) {
throw new UnsupportedOperationException("Supplier is not currently supported for RSocket interaction");
}
Function invocableFunction = StringUtils.hasText(this.rSocketFunctionProperties.getTargetAddress())
? new RSocketFunction(this.rSocketFunctionProperties.getTargetAddress(), this.rSocketFunctionProperties.getTargetPort(), function)
: function;
Type functionType = function.getFunctionType();
RSocket rsocket = buildRSocket(definition, functionType, invocableFunction);
RSocketServer.create(SocketAcceptor.with(rsocket))
.bind(TcpServerTransport.create(this.rSocketFunctionProperties.getBindAddress(), this.rSocketFunctionProperties.getBindPort()))
.subscribe(); // TODO do we need to close/dispose etc?
}
}
}

View File

@@ -0,0 +1,86 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.rsocket;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.function.Function;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
/**
* Wrapper over an instance of target Function (represented by {@link FunctionInvocationWrapper})
* which will use the result of the invocation of such function as an input to another RSocket
* effectively composing two functions over RSocket.
*
* @author Oleg Zhurakousky
* @since 3.1
*/
class RSocketFunction implements Function<Message<byte[]>, Mono<Message<byte[]>>> {
private final String bindAddress;
private final int port;
private final FunctionInvocationWrapper function;
private final RSocket rSocket;
RSocketFunction(String bindAddress, int port, FunctionInvocationWrapper function) {
this.bindAddress = bindAddress;
this.port = port;
this.function = function;
this.rSocket = RSocketConnector.connectWith(TcpClientTransport.create(this.bindAddress, this.port))
.log()
.retryWhen(Retry.backoff(5, Duration.ofSeconds(1)))
.block();
}
@SuppressWarnings("unchecked")
@Override
public Mono<Message<byte[]>> apply(Message<byte[]> input) {
Message<byte[]> result = (Message<byte[]>) function.apply(input);
Mono<Message<byte[]>> resultMessage = null;
if (result != null) {
resultMessage = this.rSocket
.requestResponse(DefaultPayload.create(result.getPayload()))
.map(this::buildResultMessage);
}
return resultMessage;
}
private Message<byte[]> buildResultMessage(Payload payload) {
ByteBuffer payloadBuffer = payload.getData();
byte[] payloadData = new byte[payloadBuffer.remaining()];
payloadBuffer.get(payloadData);
// ByteBuffer headersBuffer = responsePayload.getMetadata();
// byte[] rawData = new byte[payloadBuffer.remaining()];
// payloadBuffer.get(rawData);
return MessageBuilder.withPayload(payloadData).build();
}
}

View File

@@ -0,0 +1,71 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.rsocket;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.function.context.FunctionProperties;
/**
* Main configuration properties for RSocket integration with spring-cloud-function.
* The prefix for these properties is `spring.cloud.function.rscocket`.
*
* @author Oleg Zhurakousky
* @since 3.1
*/
@ConfigurationProperties(prefix = FunctionProperties.PREFIX + ".rsocket")
public class RSocketFunctionProperties {
private String bindAddress;
private int bindPort;
private String targetAddress;
private int targetPort;
public String getBindAddress() {
return bindAddress;
}
public void setBindAddress(String bindAddress) {
this.bindAddress = bindAddress;
}
public int getBindPort() {
return bindPort;
}
public void setBindPort(int bindPort) {
this.bindPort = bindPort;
}
public String getTargetAddress() {
return targetAddress;
}
public void setTargetAddress(String targetAddress) {
this.targetAddress = targetAddress;
}
public int getTargetPort() {
return targetPort;
}
public void setTargetPort(int targetPort) {
this.targetPort = targetPort;
}
}

View File

@@ -0,0 +1,11 @@
{
"properties": [
{
"name": "spring.cloud.function.web.path",
"type": "java.lang.String",
"description": "Path to web resources for functions (should start with / if not empty).",
"defaultValue": ""
}
]
}

View File

@@ -0,0 +1 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.springframework.cloud.function.rsocket.RSocketAutoConfiguration

View File

@@ -0,0 +1,127 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.rsocket;
import java.time.Duration;
import java.util.function.Consumer;
import java.util.function.Function;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import org.junit.jupiter.api.Test;
import reactor.util.retry.Retry;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static org.assertj.core.api.Assertions.assertThat;
/**
*
* @author Oleg Zhurakousky
* @since 3.1
*/
public class RSocketAutoConfigurationTests {
@Test
public void testRequestReplyFunction() throws Exception {
new SpringApplicationBuilder(SampleFunctionConfiguration.class).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.cloud.function.definition=uppercase",
"--spring.cloud.function.rsocket.bind-address=localhost",
"--spring.cloud.function.rsocket.bind-port=12345");
RSocket socket = RSocketConnector.connectWith(TcpClientTransport.create("localhost", 12345)).log()
.retryWhen(Retry.backoff(5, Duration.ofSeconds(1))).block();
String result = socket.requestResponse(DefaultPayload.create("\"hello\"")).map(Payload::getDataUtf8).block();
assertThat(result).isEqualTo("\"HELLO\"");
}
@Test
public void testRequestReplyFunctionWithComposition() throws Exception {
new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.cloud.function.definition=uppercase",
"--spring.cloud.function.rsocket.bind-address=localhost",
"--spring.cloud.function.rsocket.bind-port=12345");
new SpringApplicationBuilder(AdditionalFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.cloud.function.definition=reverse", "--spring.cloud.function.rsocket.bind-address=localhost",
"--spring.cloud.function.rsocket.bind-port=12346",
"--spring.cloud.function.rsocket.target-address=localhost",
"--spring.cloud.function.rsocket.target-port=12345");
RSocket socket = RSocketConnector.connectWith(TcpClientTransport.create("localhost", 12346)).log()
.retryWhen(Retry.backoff(5, Duration.ofSeconds(1))).block();
String result = socket.requestResponse(DefaultPayload.create("\"hello\"")).map(Payload::getDataUtf8).block();
assertThat(result).isEqualTo("\"OLLEH\"");
}
// @Test
// public void testFireAndForgetConsumer() throws Exception {
// new SpringApplicationBuilder(SampleFunctionConfiguration.class)
// .run("--logging.level.org.springframework.cloud.function=DEBUG",
// "--spring.cloud.function.definition=log");
//
// RSocket socket = RSocketConnector.connectWith(TcpClientTransport.create("localhost", 7000))
// .log()
// .retryWhen(Retry.backoff(5, Duration.ofSeconds(1)))
// .block();
// socket.fireAndForget(DefaultPayload.create("Hello"))
// .log()
// .onErrorContinue((e, x) -> {
// System.out.println(e);
// })
// .block();
// Thread.sleep(2000);
// System.out.println();
// }
@EnableAutoConfiguration
@Configuration
public static class SampleFunctionConfiguration {
@Bean
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}
@Bean
public Consumer<byte[]> log() {
return v -> {
System.out.println("==> In Consumer: " + new String(v));
};
}
}
@EnableAutoConfiguration
@Configuration
public static class AdditionalFunctionConfiguration {
@Bean
public Function<String, String> reverse() {
return v -> new StringBuilder(v).reverse().toString();
}
}
}