Add support for returning Mono for gRPC requestReply

Resolves #776
This commit is contained in:
Oleg Zhurakousky
2021-12-07 15:02:40 +01:00
parent 2924777d34
commit dfc700e2c9
3 changed files with 87 additions and 7 deletions

View File

@@ -87,8 +87,13 @@ public final class GrpcUtils {
MessagingServiceGrpc.MessagingServiceBlockingStub stub = MessagingServiceGrpc
.newBlockingStub(channel);
GrpcSpringMessage response = stub.requestReply(toGrpcSpringMessage(inputMessage));
return fromGrpcSpringMessage(response);
try {
GrpcSpringMessage response = stub.requestReply(toGrpcSpringMessage(inputMessage));
return fromGrpcSpringMessage(response);
}
catch (Exception e) {
throw new IllegalStateException(e);
}
}
finally {
channel.shutdownNow();

View File

@@ -24,12 +24,14 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.protobuf.ByteString;
import com.google.protobuf.GeneratedMessageV3;
import io.grpc.Status;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
@@ -38,6 +40,7 @@ import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.context.SmartLifecycle;
import org.springframework.messaging.Message;
@@ -75,12 +78,29 @@ public class MessageHandlingHelper<T extends GeneratedMessageV3> implements Smar
public void requestReply(T request, StreamObserver<T> responseObserver) {
Message<byte[]> message = this.toSpringMessage(request);
FunctionInvocationWrapper function = this.resolveFunction(message.getHeaders());
if (FunctionTypeUtils.isFlux(function.getOutputType())) {
String errorMessage = "Flux reply is not supported for `requestReply` mode";
responseObserver.onError(Status.UNKNOWN.withDescription(errorMessage)
.withCause(new UnsupportedOperationException(errorMessage)).asRuntimeException());
return;
}
Message<byte[]> replyMessage = (Message<byte[]>) function.apply(message);
GeneratedMessageV3 reply = this.toGrpcMessage(replyMessage, (Class<T>) request.getClass());
responseObserver.onNext((T) reply);
responseObserver.onCompleted();
Object replyMessage = function.apply(message);
if (replyMessage instanceof Message<?>) {
GeneratedMessageV3 reply = this.toGrpcMessage((Message<byte[]>) replyMessage, (Class<T>) request.getClass());
responseObserver.onNext((T) reply);
responseObserver.onCompleted();
}
else if (replyMessage instanceof Publisher<?>) {
if (replyMessage instanceof Mono<?>) {
Mono.from((Publisher<?>) replyMessage).doOnNext(reply -> {
GeneratedMessageV3 replyGrps = this.toGrpcMessage((Message<byte[]>) reply, (Class<T>) request.getClass());
responseObserver.onNext((T) replyGrps);
responseObserver.onCompleted();
})
.subscribe();
}
}
}
@SuppressWarnings("unchecked")

View File

@@ -27,6 +27,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -40,6 +41,7 @@ import org.springframework.util.MimeTypeUtils;
import org.springframework.util.SocketUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
/**
*
@@ -79,6 +81,49 @@ public class GrpcInteractionTests {
}
}
@Test
public void testRequestReplyWithMonoReturn() {
int port = SocketUtils.findAvailableTcpPort();
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleConfiguration.class).web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=uppercaseMonoReturn",
"--spring.cloud.function.grpc.port=" + port)) {
Message<byte[]> message = MessageBuilder.withPayload("\"hello gRPC\"".getBytes())
.setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build();
Message<byte[]> reply = GrpcUtils.requestReply("localhost", port, message);
assertThat(reply.getPayload()).isEqualTo("\"HELLO GRPC\"".getBytes());
}
}
@Test
public void testRequestReplyWithFluxReturn() {
int port = SocketUtils.findAvailableTcpPort();
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleConfiguration.class).web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=uppercaseFluxReturn",
"--spring.cloud.function.grpc.port=" + port)) {
Message<byte[]> message = MessageBuilder.withPayload("\"hello gRPC\"".getBytes())
.setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build();
try {
GrpcUtils.requestReply("localhost", port, message);
fail();
}
catch (Exception e) {
assertThat(e.getMessage()).contains("Flux reply is not supported for `requestReply` mode");
}
}
}
@Test
public void testRequstReplyFunctionDefinitionInMessage() {
int port = SocketUtils.findAvailableTcpPort();
@@ -263,6 +308,16 @@ public class GrpcInteractionTests {
return v -> v.toUpperCase();
}
@Bean
public Function<String, Mono<String>> uppercaseMonoReturn() {
return v -> Mono.just(v.toUpperCase());
}
@Bean
public Function<String, Flux<String>> uppercaseFluxReturn() {
return v -> Flux.just(v.toUpperCase(), v.toUpperCase() + "-1", v.toUpperCase() + "-2");
}
@Bean
public Function<String, String> reverse() {
return v -> new StringBuilder(v).reverse().toString();