diff --git a/spring-cloud-function-grpc/README.md b/spring-cloud-function-grpc/README.md
index 5ce6c0c2e..e53c02f68 100644
--- a/spring-cloud-function-grpc/README.md
+++ b/spring-cloud-function-grpc/README.md
@@ -1,99 +1,8 @@
### Introduction
-Spring Cloud Function allows you to invoke function via [RSocket](https://rsocket.io/). While you can read more about RSocket and it’s java
-implementation [here](https://github.com/rsocket/rsocket-java), this section will describe the parts relevant to Spring Cloud Function integration.
+Spring Cloud Function allows you to invoke function via [gRPC](https://grpc.io/).
+
+TBD
### Programming model
-From the user perspective bringing RSocket does not change the implementation of functions or any of its features, such as type conversion,
-composition, POJO functions etc.
-And while RSocket allows first class reactive interaction over the network supporting important reactive features such as back pressure,
-users of Spring Cloud Function still have freedom to implement their business logic using reactive or imperative functions delegating any
-adjustment needed to apply proper invocation model to the framework.
-
-To use RSocket integration all you need is to add `spring-cloud-function-rsocket` dependency to your classpath
-```
-
- org.springframework.cloud
- spring-cloud-function-rsocket
-
-```
-
-To interact with functions via RSocket we rely on Spring Boot support for RSocket and `RSocketRequester.Builder` API.
-The code below shows the key parts and you can get more details on various interaction models
-from [this test case](https://github.com/spring-cloud/spring-cloud-function/blob/master/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java).
-
-
-```
-@Bean
-public Function uppercase() {
- return v -> v.toUpperCase();
-}
-
-. . .
-
-RSocketRequester.Builder rsocketRequesterBuilder =
- applicationContext.getBean(RSocketRequester.Builder.class);
-
-rsocketRequesterBuilder.tcp("localhost", port)
- .route(“uppercase")
- .data("\"hello\"")
- .retrieveMono(String.class)
- .subscribe(System.out::println);
-```
-
-Once connected to RSocket we use `route` operation to specify which function we want to invoke providing the actual
-payload via `data` operation. Then we use one of the `retrieve` operations that best suits our desired interaction
-(RSocket supports multiple interaction models such as fire-and-forget, request-reply etc.)
-
-#### Order of priority for routing instructions
-
-As you can see from the preceding examples, we provide function definition as a value to `route(..)` operator of `RSocketRequester.Builder`.
-However that is not the only way. You can also use standard `spring.cloud.function.definition` property as well as `spring.cloud.function.routing-expression` or property or `MessageRoutingCallback` on the server side of the RSocket interaction (see "Function Routing and Filtering" section of reference manual).
-This raises a question of _order_ and _priorities_ when it comes to reconsiling a conflict in the event several ways of providing definition are used. So it is a mater of clearly stating the rule whcih is:
-
-***1 - MessageRoutingCallback***
-The `MessageRoutingCallback` takes precedence over all other ways of providing function definition resolution.
-
-***2 - spring.cloud.function.routing-expression***
-The `spring.cloud.function.routing-expression` property takes next precedence. So, in the event you may have also use `route(..)` operator or `spring.cloud.function.definition` property, they will be ignored if `spring.cloud.function.routing-expression` property is provided.
-
-***3 - route(..)***
-The next in line is `route(..)` operator. So in the event there are no `spring.cloud.function.routing-expression` property but you defined `spring.cloud.function.definition` property, it will be ignored in favor of definition provided by the `route(..)` operator.
-
-***4 - spring.cloud.function.definition***
-The `spring.cloud.function.definition` property is the last in the list allowing you to simply `route("")` to empty string.
-
-
-### Messaging
-
-If you want to provide and/or receive additional information that you would normally communicate via Message headers you can send and receive Spring `Message`.
-For example, the following tests case demonstrates how you can accomplish that.
-```
-Person p = new Person();
-p.setName("Ricky");
-Message message = MessageBuilder.withPayload(p).setHeader("someHeader", "foo").build();
-
-Message result = rsocketRequesterBuilder.tcp("localhost", port)
- .route("pojoMessageToPojo")
- .data(message)
- .retrieveMono(new ParameterizedTypeReference>() {})
- .block();
-```
-Aside from sending `Message`, note the usage of `ParameterizedTypeReference` to specify that we want not only `Message` in return but also `Message` with specific payload type.
-
-### Function Composition over RSocket (Distributed Function Composition)
-
-By now you shoudl be familiar with the standard function composition feature (e.g., `functionA|functionB|functionC`). This feature allows you to compose several co-located functions into one. But what if these functions are not co-located and instead separated by the network?
-
-With RSocket and our _distributed function composition_ feature you can still do it. So let's look at the example.
-
-Let's say we have `uppercase` function available to you locally and `reverse` function exposed via separate RSocket and you wan to compose `uppercase` and `reverse` into a single function. Had they been both available locally it would have been as simple as `uppercase|reverse`, but given that `reverse` function is not locally available we need a way to specify that in our composition instruction. For that we're using _redirect_ operator. So it woudl look like this `uppercase>localhost:2222`, where `localhost:2222` is the host/port combination where `reverse` function is hosted.
-What's interesting is that remote function can in itself be a result of function composition (local or remote), so effectively you are composing `uppercase` with whatever function definition (which could be composition) that is running on `localhost:2222`.
-
-The complete example is available in [this test case](https://github.com/spring-cloud/spring-cloud-function/blob/0e3a27a392f5c69727d909db26c2ba6aa0344cfd/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java#L371).
-And as you can see it is a bit more complex to showcase this feature. In this test we are composing `reverse` function with `uppercase|concat` function running remotely and then with `wrap` function running locally as if `reverse|uppercase|concat|wrap`.
-So you can see `--spring.cloud.function.definition=reverse>localhost:" + portA + "|wrap"` where `localhost:" + portA` points to another application context instance with `--spring.cloud.function.definition=uppercase|concat`. The result of the `reverse` function are sent to `uppercase|concat` function via RSocket and the result of that are fed into `wrap` function.
-
-### Samples
-
-You can also look at one of the [RSocket samples](https://github.com/spring-cloud/spring-cloud-function/tree/master/spring-cloud-function-samples/function-sample-cloudevent-rsocket) that is also introduces you to Cloud Events
\ No newline at end of file
+TBD
\ No newline at end of file
diff --git a/spring-cloud-function-grpc/pom.xml b/spring-cloud-function-grpc/pom.xml
index 85affe799..47342b24f 100644
--- a/spring-cloud-function-grpc/pom.xml
+++ b/spring-cloud-function-grpc/pom.xml
@@ -17,6 +17,7 @@
1.16.1
+ true
@@ -60,6 +61,17 @@
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+
+
+
+
+
+
org.xolstice.maven.plugins
protobuf-maven-plugin
diff --git a/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/FunctionGrpcProperties.java b/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/FunctionGrpcProperties.java
index c59c41398..526bbed6c 100644
--- a/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/FunctionGrpcProperties.java
+++ b/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/FunctionGrpcProperties.java
@@ -31,7 +31,7 @@ public class FunctionGrpcProperties {
/**
* Default gRPC port.
*/
- public final static int GRPC_PORT = 55555;
+ public final static int GRPC_PORT = 6048;
private int port = GRPC_PORT;
diff --git a/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcAutoConfiguration.java b/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcAutoConfiguration.java
index a1462c65e..0e25b8a60 100644
--- a/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcAutoConfiguration.java
+++ b/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcAutoConfiguration.java
@@ -30,10 +30,10 @@ import org.springframework.context.annotation.Configuration;
*/
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(FunctionGrpcProperties.class)
+@ConditionalOnProperty(name = "spring.cloud.function.grpc.mode", havingValue = "server", matchIfMissing = false)
public class GrpcAutoConfiguration {
@Bean
- @ConditionalOnProperty(name = "spring.cloud.function.grpc.mode", havingValue = "server", matchIfMissing = false)
public GrpcServer grpcServer(FunctionGrpcProperties grpcProperties, GrpcMessagingServiceImpl grpcMessagingService) {
return new GrpcServer(grpcProperties, grpcMessagingService);
}
diff --git a/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcMessagingServiceImpl.java b/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcMessagingServiceImpl.java
index 3187b8561..7be957145 100644
--- a/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcMessagingServiceImpl.java
+++ b/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcMessagingServiceImpl.java
@@ -32,32 +32,48 @@
package org.springframework.cloud.function.grpc;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.grpc.Status;
+import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.grpc.MessagingServiceGrpc.MessagingServiceImplBase;
import org.springframework.messaging.Message;
+import org.springframework.util.Assert;
+/**
+ *
+ * @author Oleg Zhurakousky
+ * @since 3.2
+ *
+ */
class GrpcMessagingServiceImpl extends MessagingServiceImplBase {
+ private Log logger = LogFactory.getLog(GrpcMessagingServiceImpl.class);
+
private final FunctionInvocationWrapper function;
GrpcMessagingServiceImpl(FunctionProperties funcProperties, FunctionCatalog functionCatalog) {
this.function = functionCatalog.lookup(funcProperties.getDefinition(), "application/json");
+ Assert.notNull(this.function, "Failed to lookup function " + funcProperties.getDefinition());
}
-
@Override
+ @SuppressWarnings("unchecked")
public void requestReply(GrpcMessage request, StreamObserver responseObserver) {
Message message = GrpcUtils.fromGrpcMessage(request);
+
Message replyMessage = (Message) this.function.apply(message);
GrpcMessage reply = GrpcUtils.toGrpcMessage(replyMessage);
- /*
- * The above is effectively echo. This is where we plug in function invocation
- */
+
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@@ -74,9 +90,61 @@ class GrpcMessagingServiceImpl extends MessagingServiceImplBase {
// return null;
// }
//
-// @Override
-// public StreamObserver biStream(
-// StreamObserver responseObserver) {
-// return null;
-// }
+ @Override
+ @SuppressWarnings("unchecked")
+ public StreamObserver biStream(StreamObserver responseObserver) {
+ ServerCallStreamObserver serverCallStreamObserver = (ServerCallStreamObserver) responseObserver;
+ serverCallStreamObserver.disableAutoInboundFlowControl();
+
+ AtomicBoolean wasReady = new AtomicBoolean(false);
+ serverCallStreamObserver.setOnReadyHandler(() -> {
+ if (serverCallStreamObserver.isReady() && !wasReady.get()) {
+ wasReady.set(true);
+ logger.info("Server stream is ready");
+ serverCallStreamObserver.request(1);
+ }
+ });
+ return new StreamObserver() {
+
+ @Override
+ public void onNext(GrpcMessage request) {
+ try {
+ Message message = GrpcUtils.fromGrpcMessage(request);
+
+ Message replyMessage = (Message) function
+ .apply(message);
+
+ GrpcMessage reply = GrpcUtils.toGrpcMessage(replyMessage);
+
+ responseObserver.onNext(reply);
+
+ // Check the provided ServerCallStreamObserver to see if it is still
+ // ready to accept more messages.
+ if (serverCallStreamObserver.isReady()) {
+ serverCallStreamObserver.request(1);
+ }
+ else {
+ wasReady.set(false);
+ }
+ }
+ catch (Throwable throwable) {
+ throwable.printStackTrace();
+ responseObserver.onError(
+ Status.UNKNOWN.withDescription("Error handling request").withCause(throwable).asException());
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ t.printStackTrace();
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onCompleted() {
+ logger.info("Server Stream is complete");
+ responseObserver.onCompleted();
+ }
+ };
+ }
}
diff --git a/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcServer.java b/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcServer.java
index 93a53c857..2a4dd529a 100644
--- a/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcServer.java
+++ b/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcServer.java
@@ -29,7 +29,7 @@ import org.springframework.context.SmartLifecycle;
class GrpcServer implements SmartLifecycle {
- protected Log logger = LogFactory.getLog(GrpcServer.class);
+ private Log logger = LogFactory.getLog(GrpcServer.class);
private final FunctionGrpcProperties grpcProperties;
diff --git a/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcUtils.java b/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcUtils.java
index c52e33bc0..4f8b8e009 100644
--- a/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcUtils.java
+++ b/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcUtils.java
@@ -22,6 +22,13 @@ import java.util.Map;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.ClientResponseObserver;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Sinks;
+import reactor.core.publisher.Sinks.Many;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
@@ -34,6 +41,8 @@ import org.springframework.messaging.support.MessageBuilder;
*/
public final class GrpcUtils {
+ private static Log logger = LogFactory.getLog(GrpcUtils.class);
+
private GrpcUtils() {
}
@@ -64,7 +73,7 @@ public final class GrpcUtils {
}
public static Message requestReply(String host, int port, Message inputMessage) {
- ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", port)
+ ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext().build();
MessagingServiceGrpc.MessagingServiceBlockingStub stub = MessagingServiceGrpc
.newBlockingStub(channel);
@@ -73,4 +82,98 @@ public final class GrpcUtils {
channel.shutdown();
return fromGrpcMessage(response);
}
+
+ /**
+ * Utility method to support bi-directional streaming interaction. Will connect to gRPC server using default host/port,
+ * otherwise use {@link #biStreaming(String, int, Flux)} method.
+ *
+ * Keep in mind that there is no implied relationship between input stream and output stream.
+ * They are completely independent where one may end before the other.
+ *
+ * @param inputStream {@code FluxMessage>} representing input stream.
+ * @return {@code FluxMessage>} representing output stream
+ */
+ public static Flux> biStreaming(Flux> inputStream) {
+ return biStreaming("localhost", FunctionGrpcProperties.GRPC_PORT, inputStream);
+ }
+
+ /**
+ * Utility method to support bi-directional streaming interaction.
+ * Keep in mind that there is no implied relationship between input stream and output stream.
+ * They are completely independent where one may end before the other.
+ *
+ * @param host gRPC server host name
+ * @param port gRPC server port
+ * @param inputStream {@code FluxMessage>} representing input stream
+ * @return {@code FluxMessage>} representing output stream
+ */
+ public static Flux> biStreaming(String host, int port, Flux> inputStream) {
+ ManagedChannel channel = ManagedChannelBuilder
+ .forAddress(host, port)
+ .usePlaintext().build();
+ MessagingServiceGrpc.MessagingServiceStub stub = MessagingServiceGrpc
+ .newStub(channel);
+ Many> sink = Sinks.many().unicast().onBackpressureBuffer();
+
+ ClientResponseObserver clientResponseObserver = clientResponseObserver(inputStream, sink);
+
+ stub.biStream(clientResponseObserver);
+
+ return sink.asFlux().doOnComplete(() -> {
+ logger.debug("Shutting down channel");
+ channel.shutdown();
+ });
+ }
+
+ private static ClientResponseObserver clientResponseObserver(Flux> inputStream, Many> sink) {
+ return new ClientResponseObserver() {
+
+ ClientCallStreamObserver requestStreamObserver;
+
+ @Override
+ public void beforeStart(ClientCallStreamObserver requestStreamObserver) {
+ this.requestStreamObserver = requestStreamObserver;
+ requestStreamObserver.disableAutoInboundFlowControl();
+
+ requestStreamObserver.setOnReadyHandler(new Runnable() {
+ @Override
+ public void run() {
+ inputStream
+ .doOnNext(request -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sending message: " + request);
+ }
+ requestStreamObserver.onNext(GrpcUtils.toGrpcMessage(request));
+ })
+ .doOnComplete(() -> {
+ requestStreamObserver.onCompleted();
+ })
+ .subscribe();
+ }
+ });
+ }
+
+ @Override
+ public void onNext(GrpcMessage message) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Receiving message: " + message);
+ }
+ sink.tryEmitNext(fromGrpcMessage(message));
+ requestStreamObserver.request(1);
+
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ t.printStackTrace();
+ }
+
+ @Override
+ public void onCompleted() {
+ logger.info("Client stream is complete");
+ sink.tryEmitComplete(); // TODO revisit as this would complete the server stream simply because the client is done.
+ // Perhaps we need to expose some boolean value when this is desirable
+ }
+ };
+ }
}
diff --git a/spring-cloud-function-grpc/src/test/java/org/springframework/cloud/function/grpc/GrpcInteractionTests.java b/spring-cloud-function-grpc/src/test/java/org/springframework/cloud/function/grpc/GrpcInteractionTests.java
index aa03b5a45..022da1a02 100644
--- a/spring-cloud-function-grpc/src/test/java/org/springframework/cloud/function/grpc/GrpcInteractionTests.java
+++ b/spring-cloud-function-grpc/src/test/java/org/springframework/cloud/function/grpc/GrpcInteractionTests.java
@@ -16,11 +16,13 @@
package org.springframework.cloud.function.grpc;
-
-
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
import java.util.function.Function;
import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -34,18 +36,23 @@ import org.springframework.util.MimeTypeUtils;
import static org.assertj.core.api.Assertions.assertThat;
+/**
+ *
+ * @author Oleg Zhurakousky
+ *
+ */
public class GrpcInteractionTests {
@Test
- public void test() {
+ public void testRequestReply() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleConfiguration.class).web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=uppercase",
- "--spring.cloud.function.grpc.port=55555",
+ "--spring.cloud.function.grpc.port=" + FunctionGrpcProperties.GRPC_PORT,
"--spring.cloud.function.grpc.mode=server")) {
- Message message = MessageBuilder.withPayload("hello gRPC".getBytes())
+ Message message = MessageBuilder.withPayload("\"hello gRPC\"".getBytes())
.setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build();
@@ -56,6 +63,38 @@ public class GrpcInteractionTests {
}
}
+ @Test
+ public void testBidirectionalStream() {
+ try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
+ SampleConfiguration.class).web(WebApplicationType.NONE).run(
+ "--spring.jmx.enabled=false",
+ "--spring.cloud.function.definition=uppercase",
+ "--spring.cloud.function.grpc.port="
+ + FunctionGrpcProperties.GRPC_PORT,
+ "--spring.cloud.function.grpc.mode=server")) {
+
+ List> messages = new ArrayList<>();
+ messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar")
+ .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
+ .build());
+ messages.add(MessageBuilder.withPayload("\"Julien\"".getBytes()).setHeader("foo", "bar")
+ .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
+ .build());
+ messages.add(MessageBuilder.withPayload("\"Bubbles\"".getBytes()).setHeader("foo", "bar")
+ .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
+ .build());
+
+ Flux> clientResponseObserver =
+ GrpcUtils.biStreaming("localhost", FunctionGrpcProperties.GRPC_PORT, Flux.fromIterable(messages));
+
+ List> results = clientResponseObserver.collectList().block(Duration.ofSeconds(1));
+ assertThat(results.size()).isEqualTo(3);
+ assertThat(results.get(0).getPayload()).isEqualTo("\"RICKY\"".getBytes());
+ assertThat(results.get(1).getPayload()).isEqualTo("\"JULIEN\"".getBytes());
+ assertThat(results.get(2).getPayload()).isEqualTo("\"BUBBLES\"".getBytes());
+ }
+ }
+
@EnableAutoConfiguration
public static class SampleConfiguration {