From cd4eafdde621a3304b1d32ff69432b11eb1cd190 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Fri, 24 Sep 2021 09:04:40 +0200 Subject: [PATCH] GH-744 Add guard conditions for unsupported functions for biStreaming --- .../grpc/GrpcServerMessageHandler.java | 17 +++- .../function/grpc/GrpcInteractionTests.java | 79 +++++++++++++++++++ 2 files changed, 93 insertions(+), 3 deletions(-) diff --git a/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcServerMessageHandler.java b/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcServerMessageHandler.java index b90b6b798..0b5dbd2c9 100644 --- a/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcServerMessageHandler.java +++ b/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcServerMessageHandler.java @@ -107,11 +107,22 @@ class GrpcServerMessageHandler extends MessagingServiceImplBase { } }); - if (function.isInputTypePublisher()) { - return this.biStreamReactive(responseObserver, serverCallStreamObserver); + if (this.function.isInputTypePublisher()) { + if (this.function.isOutputTypePublisher()) { + return this.biStreamReactive(responseObserver, serverCallStreamObserver); + } + throw new UnsupportedOperationException("The bi-directional streaming is " + + "not supported for functions that accept Publisher but return non-Publisher: " + + this.function); } else { - return this.biStreamImperative(responseObserver, serverCallStreamObserver, wasReady); + if (!this.function.isOutputTypePublisher()) { + return this.biStreamImperative(responseObserver, serverCallStreamObserver, wasReady); + } + throw new UnsupportedOperationException("The bidirection streaming is " + + "not supported for functions that accept non-Publisher but return Publisher: " + + this.function); + } } diff --git a/spring-cloud-function-grpc/src/test/java/org/springframework/cloud/function/grpc/GrpcInteractionTests.java b/spring-cloud-function-grpc/src/test/java/org/springframework/cloud/function/grpc/GrpcInteractionTests.java index 4c4ac990b..5b65b3916 100644 --- a/spring-cloud-function-grpc/src/test/java/org/springframework/cloud/function/grpc/GrpcInteractionTests.java +++ b/spring-cloud-function-grpc/src/test/java/org/springframework/cloud/function/grpc/GrpcInteractionTests.java @@ -35,6 +35,7 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.MimeTypeUtils; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.fail; /** * @@ -127,6 +128,74 @@ public class GrpcInteractionTests { } } + @Test + public void testStreamInStringOut() { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + SampleConfiguration.class).web(WebApplicationType.NONE).run( + "--spring.jmx.enabled=false", + "--spring.cloud.function.definition=streamInStringOut", + "--spring.cloud.function.grpc.port=" + + FunctionGrpcProperties.GRPC_PORT, + "--spring.cloud.function.grpc.mode=server")) { + + List> messages = new ArrayList<>(); + messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar") + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN) + .build()); + messages.add(MessageBuilder.withPayload("\"Julien\"".getBytes()).setHeader("foo", "bar") + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN) + .build()); + messages.add(MessageBuilder.withPayload("\"Bubbles\"".getBytes()).setHeader("foo", "bar") + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN) + .build()); + + Flux> clientResponseObserver = + GrpcUtils.biStreaming("localhost", FunctionGrpcProperties.GRPC_PORT, Flux.fromIterable(messages)); + + try { + clientResponseObserver.collectList().block(Duration.ofSeconds(1)); + fail(); + } + catch (Exception e) { + // TODO: handle exception + } + } + } + + @Test + public void testStringInStreamOut() { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + SampleConfiguration.class).web(WebApplicationType.NONE).run( + "--spring.jmx.enabled=false", + "--spring.cloud.function.definition=stringInStreamOut", + "--spring.cloud.function.grpc.port=" + + FunctionGrpcProperties.GRPC_PORT, + "--spring.cloud.function.grpc.mode=server")) { + + List> messages = new ArrayList<>(); + messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar") + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN) + .build()); + messages.add(MessageBuilder.withPayload("\"Julien\"".getBytes()).setHeader("foo", "bar") + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN) + .build()); + messages.add(MessageBuilder.withPayload("\"Bubbles\"".getBytes()).setHeader("foo", "bar") + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN) + .build()); + + Flux> clientResponseObserver = + GrpcUtils.biStreaming("localhost", FunctionGrpcProperties.GRPC_PORT, Flux.fromIterable(messages)); + + try { + clientResponseObserver.collectList().block(Duration.ofSeconds(1)); + fail(); + } + catch (Exception e) { + // TODO: handle exception + } + } + } + @EnableAutoConfiguration public static class SampleConfiguration { @@ -139,5 +208,15 @@ public class GrpcInteractionTests { public Function, Flux> uppercaseReactive() { return flux -> flux.map(v -> v.toUpperCase()); } + + @Bean + public Function, String> streamInStringOut() { + return flux -> "hello"; + } + + @Bean + public Function> stringInStreamOut() { + return value -> Flux.just(value); + } } }