GH-744 Add guard conditions for unsupported functions for biStreaming

This commit is contained in:
Oleg Zhurakousky
2021-09-24 09:04:40 +02:00
parent 384cd8509b
commit cd4eafdde6
2 changed files with 93 additions and 3 deletions

View File

@@ -107,11 +107,22 @@ class GrpcServerMessageHandler extends MessagingServiceImplBase {
}
});
if (function.isInputTypePublisher()) {
return this.biStreamReactive(responseObserver, serverCallStreamObserver);
if (this.function.isInputTypePublisher()) {
if (this.function.isOutputTypePublisher()) {
return this.biStreamReactive(responseObserver, serverCallStreamObserver);
}
throw new UnsupportedOperationException("The bi-directional streaming is "
+ "not supported for functions that accept Publisher but return non-Publisher: "
+ this.function);
}
else {
return this.biStreamImperative(responseObserver, serverCallStreamObserver, wasReady);
if (!this.function.isOutputTypePublisher()) {
return this.biStreamImperative(responseObserver, serverCallStreamObserver, wasReady);
}
throw new UnsupportedOperationException("The bidirection streaming is "
+ "not supported for functions that accept non-Publisher but return Publisher: "
+ this.function);
}
}

View File

@@ -35,6 +35,7 @@ import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
/**
*
@@ -127,6 +128,74 @@ public class GrpcInteractionTests {
}
}
@Test
public void testStreamInStringOut() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleConfiguration.class).web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=streamInStringOut",
"--spring.cloud.function.grpc.port="
+ FunctionGrpcProperties.GRPC_PORT,
"--spring.cloud.function.grpc.mode=server")) {
List<Message<byte[]>> messages = new ArrayList<>();
messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
messages.add(MessageBuilder.withPayload("\"Julien\"".getBytes()).setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
messages.add(MessageBuilder.withPayload("\"Bubbles\"".getBytes()).setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
Flux<Message<byte[]>> clientResponseObserver =
GrpcUtils.biStreaming("localhost", FunctionGrpcProperties.GRPC_PORT, Flux.fromIterable(messages));
try {
clientResponseObserver.collectList().block(Duration.ofSeconds(1));
fail();
}
catch (Exception e) {
// TODO: handle exception
}
}
}
@Test
public void testStringInStreamOut() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleConfiguration.class).web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=stringInStreamOut",
"--spring.cloud.function.grpc.port="
+ FunctionGrpcProperties.GRPC_PORT,
"--spring.cloud.function.grpc.mode=server")) {
List<Message<byte[]>> messages = new ArrayList<>();
messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
messages.add(MessageBuilder.withPayload("\"Julien\"".getBytes()).setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
messages.add(MessageBuilder.withPayload("\"Bubbles\"".getBytes()).setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
Flux<Message<byte[]>> clientResponseObserver =
GrpcUtils.biStreaming("localhost", FunctionGrpcProperties.GRPC_PORT, Flux.fromIterable(messages));
try {
clientResponseObserver.collectList().block(Duration.ofSeconds(1));
fail();
}
catch (Exception e) {
// TODO: handle exception
}
}
}
@EnableAutoConfiguration
public static class SampleConfiguration {
@@ -139,5 +208,15 @@ public class GrpcInteractionTests {
public Function<Flux<String>, Flux<String>> uppercaseReactive() {
return flux -> flux.map(v -> v.toUpperCase());
}
@Bean
public Function<Flux<String>, String> streamInStringOut() {
return flux -> "hello";
}
@Bean
public Function<String, Flux<String>> stringInStreamOut() {
return value -> Flux.just(value);
}
}
}