Add support for returning Mono for gRPC requestReply

Resolves #776
This commit is contained in:
Oleg Zhurakousky
2021-12-07 15:02:40 +01:00
parent 14cfa0aa4f
commit a174ab34b8
3 changed files with 87 additions and 7 deletions

View File

@@ -27,6 +27,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -40,6 +41,7 @@ import org.springframework.util.MimeTypeUtils;
import org.springframework.util.SocketUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
/**
*
@@ -79,6 +81,49 @@ public class GrpcInteractionTests {
}
}
@Test
public void testRequestReplyWithMonoReturn() {
int port = SocketUtils.findAvailableTcpPort();
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleConfiguration.class).web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=uppercaseMonoReturn",
"--spring.cloud.function.grpc.port=" + port)) {
Message<byte[]> message = MessageBuilder.withPayload("\"hello gRPC\"".getBytes())
.setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build();
Message<byte[]> reply = GrpcUtils.requestReply("localhost", port, message);
assertThat(reply.getPayload()).isEqualTo("\"HELLO GRPC\"".getBytes());
}
}
@Test
public void testRequestReplyWithFluxReturn() {
int port = SocketUtils.findAvailableTcpPort();
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleConfiguration.class).web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=uppercaseFluxReturn",
"--spring.cloud.function.grpc.port=" + port)) {
Message<byte[]> message = MessageBuilder.withPayload("\"hello gRPC\"".getBytes())
.setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build();
try {
GrpcUtils.requestReply("localhost", port, message);
fail();
}
catch (Exception e) {
assertThat(e.getMessage()).contains("Flux reply is not supported for `requestReply` mode");
}
}
}
@Test
public void testRequstReplyFunctionDefinitionInMessage() {
int port = SocketUtils.findAvailableTcpPort();
@@ -263,6 +308,16 @@ public class GrpcInteractionTests {
return v -> v.toUpperCase();
}
@Bean
public Function<String, Mono<String>> uppercaseMonoReturn() {
return v -> Mono.just(v.toUpperCase());
}
@Bean
public Function<String, Flux<String>> uppercaseFluxReturn() {
return v -> Flux.just(v.toUpperCase(), v.toUpperCase() + "-1", v.toUpperCase() + "-2");
}
@Bean
public Function<String, String> reverse() {
return v -> new StringBuilder(v).reverse().toString();