Files
spring-cloud-function/spring-cloud-function-grpc
Dave Syer e1a9280cb9 Add reflection service to grpc server
User can now inspect service definitions (e.g. with grpcurl).
2021-10-11 15:42:15 +01:00
..
2021-10-11 15:42:15 +01:00
2021-09-15 19:32:00 +02:00
2021-10-11 15:42:15 +01:00

Introduction

Spring Cloud Function allows you to invoke function via gRPC. While you can read more about gRPC in te provided link, this section will describe the parts relevant to Spring Cloud Function integration.

As with all other Spring-boot based frameworks all you need to do is add spring-cloud-function-grpc dependency to your POM.

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-function-grpc</artifactId>
	<version>${current.version}</version>
</dependency>

Programming model

Two operation modes (client/server)

Spring Cloud Function gRPC support provides two modes of operation - client and server. In other words when you add spring-cloud-function-grpc dependency to your POM you may or may not want the gRPC server as you may only be interested in client-side utilities to invoke a function exposed via gRPC server running on some host/port. To support these two modes Spring Cloud Function provides spring.cloud.function.grpc.server which defaults to true. This means that the default mode of operation is server, since the core intention of our current gRPC support is to expose user Functions via gRPC. However, if you're only inteersted in using client-side utilities (e.g., GrpcUtils to help to invoke a function or convert GrpcMessage to Spring Message and vice versa), you can set this property to false.

In the server (default) mode, te gRPC server would be bound to te default port 6048. You can change it by providing spring.cloud.function.grpc.port property.

Core Data and Service

At the center of gRPC and Spring Cloud Function integration is a canonical protobuff structure - GrpcMessage. It is modeled after Spring Message.

message GrpcMessage {
    bytes payload = 1;
    map<string, string> headers = 2;
}

As you can see it is a very generic structure which can support any type of data amd metadata you wish to exchange.

It alos defines a MessagingService allowing you to generate required stubs to support true plolyglot nature of gRPC.

service MessagingService {
    rpc biStream(stream GrpcMessage) returns (stream GrpcMessage);
    
    rpc clientStream(stream GrpcMessage) returns (GrpcMessage);
    
    rpc serverStream(GrpcMessage) returns (stream GrpcMessage);
    
    rpc requestReply(GrpcMessage) returns (GrpcMessage);
}

That said, when using Java, you do not need to generate anything, rather identify function definition and send and receive Spring Messages. You can get a pretty good idea from this test case.

4 Interaction RPC Modes

The gRPC provides 4 interaction modes

  • Reques/Repply RPC
  • Server-side streaming RPC
  • Client-side streaming RPC
  • Bi-directional streaming RPC

Spring Cloud Function provides support for all 4 of them.

Request Reply RPC

The most straight forward interaction mode is Request/Reply. Suppose you have a function

@EnableAutoConfiguration
public static class SampleConfiguration {
	@Bean
	public Function<String, String> uppercase() {
		return v -> v.toUpperCase();
	}
}

After identifying this function via spring.cloud.function.definition property (see example here), you can invoke it using utility method(s) provided in GrpcUtils class

Message<byte[]> message = MessageBuilder.withPayload("\"hello gRPC\"".getBytes())
			.setHeader("foo", "bar")
			.build();
Message<byte[]> reply = GrpcUtils.requestReply(message);

You can also provide spring.cloud.function.definition property via Message headers, to support more dynamic cases.

Message<byte[]> message = MessageBuilder.withPayload("\"hello gRPC\"".getBytes())
			.setHeader("foo", "bar")
			.setHeader("spring.cloud.function.definition", "reverse")
			.build();
Server-side streaming RPC

The Server-side streaming RPC allows you to reply with the stream of data.

@EnableAutoConfiguration
public static class SampleConfiguration {
	@Bean
	public Function<String, Flux<String>> stringInStreamOut() {
		return value -> Flux.just(value, value.toUpperCase());
	}
}

After identifying this function via spring.cloud.function.definition property (see example here), you can invoke it using utility method(s) provided in GrpcUtils class

Message<byte[]> message = MessageBuilder.withPayload("\"hello gRPC\"".getBytes()).setHeader("foo", "bar").build();

Flux<Message<byte[]>> reply =
		GrpcUtils.serverStream("localhost", FunctionGrpcProperties.GRPC_PORT, message);

List<Message<byte[]>> results = reply.collectList().block(Duration.ofSeconds(5));

You can see that gRPC stream is mapped to instance of Flux from project reactor

Similarly to the request/reply you can also provide spring.cloud.function.definition property via Message headers, to support more dynamic cases.

Message<byte[]> message = MessageBuilder.withPayload("\"hello gRPC\"".getBytes())
			.setHeader("foo", "bar")
			.setHeader("spring.cloud.function.definition", "reverse")
			.build();
Client-side streaming RPC

The Client-side streaming RPC allows you to stream input data and receive a single reply.

@EnableAutoConfiguration
public static class SampleConfiguration {
	@Bean
	public Function<Flux<String>, String> streamInStringOut() {
		return flux -> flux.doOnNext(v -> {
			try {
				// do something useful
				Thread.sleep(new Random().nextInt(2000)); // artificial delay
			}
			catch (Exception e) {
				// ignore
			}
		}).collectList().block().toString();
	}
}

After identifying this function via spring.cloud.function.definition property (see example here), you can invoke it using utility method(s) provided in GrpcUtils class

List<Message<byte[]>> messages = new ArrayList<>();
messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar")
		.build());
messages.add(MessageBuilder.withPayload("\"Julien\"".getBytes()).setHeader("foo", "bar")
		.build());
messages.add(MessageBuilder.withPayload("\"Bubbles\"".getBytes()).setHeader("foo", "bar")
		.build());

Message<byte[]> reply =
		GrpcUtils.clientStream("localhost", FunctionGrpcProperties.GRPC_PORT, Flux.fromIterable(messages));

You can see that gRPC stream is mapped to instance of Flux from project reactor

Unlike the request/reply and server-side streaming, you can ONLY pass function definition via property or environment variable.

Bi-Directional streaming RPC

The bi-directional streaming RPC allows you to stream input and output data.

@EnableAutoConfiguration
public static class SampleConfiguration {
	@Bean
	public Function<Flux<String>, Flux<String>> uppercaseReactive() {
		return flux -> flux.map(v -> v.toUpperCase());
	}
}

After identifying this function via spring.cloud.function.definition property (see example here), you can invoke it using utility method(s) provided in GrpcUtils class

List<Message<byte[]>> messages = new ArrayList<>();
messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar")
		.build());
messages.add(MessageBuilder.withPayload("\"Julien\"".getBytes()).setHeader("foo", "bar")
		.build());
messages.add(MessageBuilder.withPayload("\"Bubbles\"".getBytes()).setHeader("foo", "bar")
		.build());

Flux<Message<byte[]>> clientResponseObserver =
		GrpcUtils.biStreaming("localhost", FunctionGrpcProperties.GRPC_PORT, Flux.fromIterable(messages));

List<Message<byte[]>> results = clientResponseObserver.collectList().block(Duration.ofSeconds(1));

You can see that gRPC stream is mapped to instance of Flux from project reactor

Unlike the request/reply and server-side streaming, you can ONLY pass function definition via property or environment variable.