GH-744 Add initial support for bi-directional gRPC

Resolves #744
This commit is contained in:
Oleg Zhurakousky
2021-09-20 14:55:05 +02:00
parent 3b4b7aae5c
commit 9b11199eab
8 changed files with 244 additions and 113 deletions

View File

@@ -1,99 +1,8 @@
### Introduction
Spring Cloud Function allows you to invoke function via [RSocket](https://rsocket.io/). While you can read more about RSocket and its java
implementation [here](https://github.com/rsocket/rsocket-java), this section will describe the parts relevant to Spring Cloud Function integration.
Spring Cloud Function allows you to invoke function via [gRPC](https://grpc.io/).
TBD
### Programming model
From the user perspective bringing RSocket does not change the implementation of functions or any of its features, such as type conversion,
composition, POJO functions etc.
And while RSocket allows first class reactive interaction over the network supporting important reactive features such as back pressure,
users of Spring Cloud Function still have freedom to implement their business logic using reactive or imperative functions delegating any
adjustment needed to apply proper invocation model to the framework.
To use RSocket integration all you need is to add `spring-cloud-function-rsocket` dependency to your classpath
```
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-rsocket</artifactId>
</dependency>
```
To interact with functions via RSocket we rely on Spring Boot support for RSocket and `RSocketRequester.Builder` API.
The code below shows the key parts and you can get more details on various interaction models
from [this test case](https://github.com/spring-cloud/spring-cloud-function/blob/master/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java).
```
@Bean
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}
. . .
RSocketRequester.Builder rsocketRequesterBuilder =
applicationContext.getBean(RSocketRequester.Builder.class);
rsocketRequesterBuilder.tcp("localhost", port)
.route(“uppercase")
.data("\"hello\"")
.retrieveMono(String.class)
.subscribe(System.out::println);
```
Once connected to RSocket we use `route` operation to specify which function we want to invoke providing the actual
payload via `data` operation. Then we use one of the `retrieve` operations that best suits our desired interaction
(RSocket supports multiple interaction models such as fire-and-forget, request-reply etc.)
#### Order of priority for routing instructions
As you can see from the preceding examples, we provide function definition as a value to `route(..)` operator of `RSocketRequester.Builder`.
However that is not the only way. You can also use standard `spring.cloud.function.definition` property as well as `spring.cloud.function.routing-expression` or property or `MessageRoutingCallback` on the server side of the RSocket interaction (see "Function Routing and Filtering" section of reference manual).
This raises a question of _order_ and _priorities_ when it comes to reconsiling a conflict in the event several ways of providing definition are used. So it is a mater of clearly stating the rule whcih is:
***1 - MessageRoutingCallback***
The `MessageRoutingCallback` takes precedence over all other ways of providing function definition resolution.
***2 - spring.cloud.function.routing-expression***
The `spring.cloud.function.routing-expression` property takes next precedence. So, in the event you may have also use `route(..)` operator or `spring.cloud.function.definition` property, they will be ignored if `spring.cloud.function.routing-expression` property is provided.
***3 - route(..)***
The next in line is `route(..)` operator. So in the event there are no `spring.cloud.function.routing-expression` property but you defined `spring.cloud.function.definition` property, it will be ignored in favor of definition provided by the `route(..)` operator.
***4 - spring.cloud.function.definition***
The `spring.cloud.function.definition` property is the last in the list allowing you to simply `route("")` to empty string.
### Messaging
If you want to provide and/or receive additional information that you would normally communicate via Message headers you can send and receive Spring `Message`.
For example, the following tests case demonstrates how you can accomplish that.
```
Person p = new Person();
p.setName("Ricky");
Message<Person> message = MessageBuilder.withPayload(p).setHeader("someHeader", "foo").build();
Message<Employee> result = rsocketRequesterBuilder.tcp("localhost", port)
.route("pojoMessageToPojo")
.data(message)
.retrieveMono(new ParameterizedTypeReference<Message<Employee>>() {})
.block();
```
Aside from sending `Message`, note the usage of `ParameterizedTypeReference` to specify that we want not only `Message` in return but also `Message` with specific payload type.
### Function Composition over RSocket (Distributed Function Composition)
By now you shoudl be familiar with the standard function composition feature (e.g., `functionA|functionB|functionC`). This feature allows you to compose several co-located functions into one. But what if these functions are not co-located and instead separated by the network?
With RSocket and our _distributed function composition_ feature you can still do it. So let's look at the example.
Let's say we have `uppercase` function available to you locally and `reverse` function exposed via separate RSocket and you wan to compose `uppercase` and `reverse` into a single function. Had they been both available locally it would have been as simple as `uppercase|reverse`, but given that `reverse` function is not locally available we need a way to specify that in our composition instruction. For that we're using _redirect_ operator. So it woudl look like this `uppercase>localhost:2222`, where `localhost:2222` is the host/port combination where `reverse` function is hosted.
What's interesting is that remote function can in itself be a result of function composition (local or remote), so effectively you are composing `uppercase` with whatever function definition (which could be composition) that is running on `localhost:2222`.
The complete example is available in [this test case](https://github.com/spring-cloud/spring-cloud-function/blob/0e3a27a392f5c69727d909db26c2ba6aa0344cfd/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java#L371).
And as you can see it is a bit more complex to showcase this feature. In this test we are composing `reverse` function with `uppercase|concat` function running remotely and then with `wrap` function running locally as if `reverse|uppercase|concat|wrap`.
So you can see `--spring.cloud.function.definition=reverse>localhost:" + portA + "|wrap"` where `localhost:" + portA` points to another application context instance with `--spring.cloud.function.definition=uppercase|concat`. The result of the `reverse` function are sent to `uppercase|concat` function via RSocket and the result of that are fed into `wrap` function.
### Samples
You can also look at one of the [RSocket samples](https://github.com/spring-cloud/spring-cloud-function/tree/master/spring-cloud-function-samples/function-sample-cloudevent-rsocket) that is also introduces you to Cloud Events
TBD

View File

@@ -17,6 +17,7 @@
<properties>
<grpc.version>1.16.1</grpc.version>
<checkstyle.skip>true</checkstyle.skip>
</properties>
<dependencies>
@@ -60,6 +61,17 @@
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<!-- <executions> -->
<!-- <execution> -->
<!-- <id>checkstyle-validation</id> -->
<!-- <phase>none</phase> -->
<!-- </execution> -->
<!-- </executions> -->
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>

View File

@@ -31,7 +31,7 @@ public class FunctionGrpcProperties {
/**
* Default gRPC port.
*/
public final static int GRPC_PORT = 55555;
public final static int GRPC_PORT = 6048;
private int port = GRPC_PORT;

View File

@@ -30,10 +30,10 @@ import org.springframework.context.annotation.Configuration;
*/
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(FunctionGrpcProperties.class)
@ConditionalOnProperty(name = "spring.cloud.function.grpc.mode", havingValue = "server", matchIfMissing = false)
public class GrpcAutoConfiguration {
@Bean
@ConditionalOnProperty(name = "spring.cloud.function.grpc.mode", havingValue = "server", matchIfMissing = false)
public GrpcServer grpcServer(FunctionGrpcProperties grpcProperties, GrpcMessagingServiceImpl grpcMessagingService) {
return new GrpcServer(grpcProperties, grpcMessagingService);
}

View File

@@ -32,32 +32,48 @@
package org.springframework.cloud.function.grpc;
import java.util.concurrent.atomic.AtomicBoolean;
import io.grpc.Status;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.grpc.MessagingServiceGrpc.MessagingServiceImplBase;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
*
* @author Oleg Zhurakousky
* @since 3.2
*
*/
class GrpcMessagingServiceImpl extends MessagingServiceImplBase {
private Log logger = LogFactory.getLog(GrpcMessagingServiceImpl.class);
private final FunctionInvocationWrapper function;
GrpcMessagingServiceImpl(FunctionProperties funcProperties, FunctionCatalog functionCatalog) {
this.function = functionCatalog.lookup(funcProperties.getDefinition(), "application/json");
Assert.notNull(this.function, "Failed to lookup function " + funcProperties.getDefinition());
}
@Override
@SuppressWarnings("unchecked")
public void requestReply(GrpcMessage request, StreamObserver<GrpcMessage> responseObserver) {
Message<byte[]> message = GrpcUtils.fromGrpcMessage(request);
Message<byte[]> replyMessage = (Message<byte[]>) this.function.apply(message);
GrpcMessage reply = GrpcUtils.toGrpcMessage(replyMessage);
/*
* The above is effectively echo. This is where we plug in function invocation
*/
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@@ -74,9 +90,61 @@ class GrpcMessagingServiceImpl extends MessagingServiceImplBase {
// return null;
// }
//
// @Override
// public StreamObserver<GrpcMessage> biStream(
// StreamObserver<GrpcMessage> responseObserver) {
// return null;
// }
@Override
@SuppressWarnings("unchecked")
public StreamObserver<GrpcMessage> biStream(StreamObserver<GrpcMessage> responseObserver) {
ServerCallStreamObserver<GrpcMessage> serverCallStreamObserver = (ServerCallStreamObserver<GrpcMessage>) responseObserver;
serverCallStreamObserver.disableAutoInboundFlowControl();
AtomicBoolean wasReady = new AtomicBoolean(false);
serverCallStreamObserver.setOnReadyHandler(() -> {
if (serverCallStreamObserver.isReady() && !wasReady.get()) {
wasReady.set(true);
logger.info("Server stream is ready");
serverCallStreamObserver.request(1);
}
});
return new StreamObserver<GrpcMessage>() {
@Override
public void onNext(GrpcMessage request) {
try {
Message<byte[]> message = GrpcUtils.fromGrpcMessage(request);
Message<byte[]> replyMessage = (Message<byte[]>) function
.apply(message);
GrpcMessage reply = GrpcUtils.toGrpcMessage(replyMessage);
responseObserver.onNext(reply);
// Check the provided ServerCallStreamObserver to see if it is still
// ready to accept more messages.
if (serverCallStreamObserver.isReady()) {
serverCallStreamObserver.request(1);
}
else {
wasReady.set(false);
}
}
catch (Throwable throwable) {
throwable.printStackTrace();
responseObserver.onError(
Status.UNKNOWN.withDescription("Error handling request").withCause(throwable).asException());
}
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
responseObserver.onCompleted();
}
@Override
public void onCompleted() {
logger.info("Server Stream is complete");
responseObserver.onCompleted();
}
};
}
}

View File

@@ -29,7 +29,7 @@ import org.springframework.context.SmartLifecycle;
class GrpcServer implements SmartLifecycle {
protected Log logger = LogFactory.getLog(GrpcServer.class);
private Log logger = LogFactory.getLog(GrpcServer.class);
private final FunctionGrpcProperties grpcProperties;

View File

@@ -22,6 +22,13 @@ import java.util.Map;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
@@ -34,6 +41,8 @@ import org.springframework.messaging.support.MessageBuilder;
*/
public final class GrpcUtils {
private static Log logger = LogFactory.getLog(GrpcUtils.class);
private GrpcUtils() {
}
@@ -64,7 +73,7 @@ public final class GrpcUtils {
}
public static Message<byte[]> requestReply(String host, int port, Message<byte[]> inputMessage) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", port)
ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext().build();
MessagingServiceGrpc.MessagingServiceBlockingStub stub = MessagingServiceGrpc
.newBlockingStub(channel);
@@ -73,4 +82,98 @@ public final class GrpcUtils {
channel.shutdown();
return fromGrpcMessage(response);
}
/**
* Utility method to support bi-directional streaming interaction. Will connect to gRPC server using default host/port,
* otherwise use {@link #biStreaming(String, int, Flux)} method.
*
* Keep in mind that there is no implied relationship between input stream and output stream.
* They are completely independent where one may end before the other.
*
* @param inputStream {@code FluxMessage<byte[]>>} representing input stream.
* @return {@code FluxMessage<byte[]>>} representing output stream
*/
public static Flux<Message<byte[]>> biStreaming(Flux<Message<byte[]>> inputStream) {
return biStreaming("localhost", FunctionGrpcProperties.GRPC_PORT, inputStream);
}
/**
* Utility method to support bi-directional streaming interaction.
* Keep in mind that there is no implied relationship between input stream and output stream.
* They are completely independent where one may end before the other.
*
* @param host gRPC server host name
* @param port gRPC server port
* @param inputStream {@code FluxMessage<byte[]>>} representing input stream
* @return {@code FluxMessage<byte[]>>} representing output stream
*/
public static Flux<Message<byte[]>> biStreaming(String host, int port, Flux<Message<byte[]>> inputStream) {
ManagedChannel channel = ManagedChannelBuilder
.forAddress(host, port)
.usePlaintext().build();
MessagingServiceGrpc.MessagingServiceStub stub = MessagingServiceGrpc
.newStub(channel);
Many<Message<byte[]>> sink = Sinks.many().unicast().onBackpressureBuffer();
ClientResponseObserver<GrpcMessage, GrpcMessage> clientResponseObserver = clientResponseObserver(inputStream, sink);
stub.biStream(clientResponseObserver);
return sink.asFlux().doOnComplete(() -> {
logger.debug("Shutting down channel");
channel.shutdown();
});
}
private static ClientResponseObserver<GrpcMessage, GrpcMessage> clientResponseObserver(Flux<Message<byte[]>> inputStream, Many<Message<byte[]>> sink) {
return new ClientResponseObserver<GrpcMessage, GrpcMessage>() {
ClientCallStreamObserver<GrpcMessage> requestStreamObserver;
@Override
public void beforeStart(ClientCallStreamObserver<GrpcMessage> requestStreamObserver) {
this.requestStreamObserver = requestStreamObserver;
requestStreamObserver.disableAutoInboundFlowControl();
requestStreamObserver.setOnReadyHandler(new Runnable() {
@Override
public void run() {
inputStream
.doOnNext(request -> {
if (logger.isDebugEnabled()) {
logger.debug("Sending message: " + request);
}
requestStreamObserver.onNext(GrpcUtils.toGrpcMessage(request));
})
.doOnComplete(() -> {
requestStreamObserver.onCompleted();
})
.subscribe();
}
});
}
@Override
public void onNext(GrpcMessage message) {
if (logger.isDebugEnabled()) {
logger.debug("Receiving message: " + message);
}
sink.tryEmitNext(fromGrpcMessage(message));
requestStreamObserver.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
logger.info("Client stream is complete");
sink.tryEmitComplete(); // TODO revisit as this would complete the server stream simply because the client is done.
// Perhaps we need to expose some boolean value when this is desirable
}
};
}
}

View File

@@ -16,11 +16,13 @@
package org.springframework.cloud.function.grpc;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -34,18 +36,23 @@ import org.springframework.util.MimeTypeUtils;
import static org.assertj.core.api.Assertions.assertThat;
/**
*
* @author Oleg Zhurakousky
*
*/
public class GrpcInteractionTests {
@Test
public void test() {
public void testRequestReply() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleConfiguration.class).web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=uppercase",
"--spring.cloud.function.grpc.port=55555",
"--spring.cloud.function.grpc.port=" + FunctionGrpcProperties.GRPC_PORT,
"--spring.cloud.function.grpc.mode=server")) {
Message<byte[]> message = MessageBuilder.withPayload("hello gRPC".getBytes())
Message<byte[]> message = MessageBuilder.withPayload("\"hello gRPC\"".getBytes())
.setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build();
@@ -56,6 +63,38 @@ public class GrpcInteractionTests {
}
}
@Test
public void testBidirectionalStream() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleConfiguration.class).web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=uppercase",
"--spring.cloud.function.grpc.port="
+ FunctionGrpcProperties.GRPC_PORT,
"--spring.cloud.function.grpc.mode=server")) {
List<Message<byte[]>> messages = new ArrayList<>();
messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
messages.add(MessageBuilder.withPayload("\"Julien\"".getBytes()).setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
messages.add(MessageBuilder.withPayload("\"Bubbles\"".getBytes()).setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
Flux<Message<byte[]>> clientResponseObserver =
GrpcUtils.biStreaming("localhost", FunctionGrpcProperties.GRPC_PORT, Flux.fromIterable(messages));
List<Message<byte[]>> results = clientResponseObserver.collectList().block(Duration.ofSeconds(1));
assertThat(results.size()).isEqualTo(3);
assertThat(results.get(0).getPayload()).isEqualTo("\"RICKY\"".getBytes());
assertThat(results.get(1).getPayload()).isEqualTo("\"JULIEN\"".getBytes());
assertThat(results.get(2).getPayload()).isEqualTo("\"BUBBLES\"".getBytes());
}
}
@EnableAutoConfiguration
public static class SampleConfiguration {