GH-744 Add initial support for server-side streaming to gRPC

This commit is contained in:
Oleg Zhurakousky
2021-09-24 16:36:54 +02:00
parent bc3a4ea7ca
commit 0663bb75b4
3 changed files with 67 additions and 15 deletions

View File

@@ -118,10 +118,10 @@ public class GrpcInteractionTests {
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
Flux<Message<byte[]>> clientResponseObserver =
Flux<Message<byte[]>> resultStream =
GrpcUtils.biStreaming("localhost", FunctionGrpcProperties.GRPC_PORT, Flux.fromIterable(messages));
List<Message<byte[]>> results = clientResponseObserver.collectList().block(Duration.ofSeconds(5));
List<Message<byte[]>> results = resultStream.collectList().block(Duration.ofSeconds(5));
assertThat(results.size()).isEqualTo(3);
assertThat(results.get(0).getPayload()).isEqualTo("\"RICKY\"".getBytes());
assertThat(results.get(1).getPayload()).isEqualTo("\"JULIEN\"".getBytes());
@@ -154,6 +154,28 @@ public class GrpcInteractionTests {
}
}
@Test
public void testServerStreaming() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleConfiguration.class).web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=stringInStreamOut",
"--spring.cloud.function.grpc.port="
+ FunctionGrpcProperties.GRPC_PORT,
"--spring.cloud.function.grpc.mode=server")) {
Message<byte[]> message = MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar").build();
Flux<Message<byte[]>> reply =
GrpcUtils.serverStream("localhost", FunctionGrpcProperties.GRPC_PORT, message);
List<Message<byte[]>> results = reply.collectList().block(Duration.ofSeconds(5));
assertThat(results.size()).isEqualTo(2);
assertThat(results.get(0).getPayload()).isEqualTo("\"Ricky\"".getBytes());
assertThat(results.get(1).getPayload()).isEqualTo("\"RICKY\"".getBytes());
}
}
@Test
public void testBiStreamStreamInStringOutFailure() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
@@ -166,13 +188,10 @@ public class GrpcInteractionTests {
List<Message<byte[]>> messages = new ArrayList<>();
messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
messages.add(MessageBuilder.withPayload("\"Julien\"".getBytes()).setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
messages.add(MessageBuilder.withPayload("\"Bubbles\"".getBytes()).setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
Flux<Message<byte[]>> clientResponseObserver =
@@ -200,13 +219,10 @@ public class GrpcInteractionTests {
List<Message<byte[]>> messages = new ArrayList<>();
messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
messages.add(MessageBuilder.withPayload("\"Julien\"".getBytes()).setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
messages.add(MessageBuilder.withPayload("\"Bubbles\"".getBytes()).setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
Flux<Message<byte[]>> clientResponseObserver =
@@ -249,7 +265,7 @@ public class GrpcInteractionTests {
@Bean
public Function<String, Flux<String>> stringInStreamOut() {
return value -> Flux.just(value);
return value -> Flux.just(value, value.toUpperCase());
}
}
}