Polish gRPC lifecycle

This commit is contained in:
Oleg Zhurakousky
2021-10-18 20:20:33 +02:00
parent 56a75cd571
commit 6eb85ca5e1
3 changed files with 43 additions and 27 deletions

View File

@@ -83,12 +83,16 @@ final class GrpcUtils {
public static Message<byte[]> requestReply(String host, int port, Message<byte[]> inputMessage) {
ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext().build();
MessagingServiceGrpc.MessagingServiceBlockingStub stub = MessagingServiceGrpc
.newBlockingStub(channel);
try {
MessagingServiceGrpc.MessagingServiceBlockingStub stub = MessagingServiceGrpc
.newBlockingStub(channel);
GrpcSpringMessage response = stub.requestReply(toGrpcSpringMessage(inputMessage));
channel.shutdown();
return fromGrpcSpringMessage(response);
GrpcSpringMessage response = stub.requestReply(toGrpcSpringMessage(inputMessage));
return fromGrpcSpringMessage(response);
}
finally {
channel.shutdownNow();
}
}
/**
@@ -129,7 +133,11 @@ final class GrpcUtils {
return sink.asFlux().doOnComplete(() -> {
logger.debug("Shutting down channel");
channel.shutdown();
channel.shutdownNow();
})
.doOnError(e -> {
e.printStackTrace();
channel.shutdownNow();
});
}
@@ -151,10 +159,14 @@ final class GrpcUtils {
sink.tryEmitComplete();
});
return sink.asFlux()
.doOnComplete(() -> {
channel.shutdown();
channel.shutdownNow();
executor.shutdownNow();
})
.doOnError(e -> {
e.printStackTrace();
channel.shutdownNow();
executor.shutdownNow();
});
}
@@ -196,11 +208,13 @@ final class GrpcUtils {
@Override
public void onError(Throwable t) {
t.printStackTrace();
channel.shutdownNow();
}
@Override
public void onCompleted() {
logger.info("Client completed");
channel.shutdownNow();
}
};
@@ -220,7 +234,11 @@ final class GrpcUtils {
}
}).doOnComplete(() -> {
requestObserver.onCompleted();
}).subscribe();
}).doOnError(e -> {
e.printStackTrace();
channel.shutdownNow();
})
.subscribe();
try {
return resultRef.poll(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
@@ -229,6 +247,9 @@ final class GrpcUtils {
Thread.currentThread().interrupt();
throw new IllegalStateException(ie);
}
finally {
channel.shutdownNow();
}
}
private static ClientResponseObserver<GrpcSpringMessage, GrpcSpringMessage> clientResponseObserver(Flux<Message<byte[]>> inputStream, Many<Message<byte[]>> sink) {

View File

@@ -185,18 +185,26 @@ public class MessageHandlingHelper<T extends GeneratedMessageV3> implements Smar
if (function.isOutputTypePublisher()) {
return this.biStreamReactive(responseObserver, serverCallStreamObserver, grpcMessageType);
}
throw new UnsupportedOperationException("The bi-directional streaming is "
UnsupportedOperationException ex = new UnsupportedOperationException("The bi-directional streaming is "
+ "not supported for functions that accept Publisher but return non-Publisher: "
+ function);
// responseObserver.onError(Status.UNKNOWN.withDescription("Error handling request")
// .withCause(ex).asException());
responseObserver.onCompleted();
throw ex;
}
else {
if (!function.isOutputTypePublisher()) {
return this.biStreamImperative(responseObserver, serverCallStreamObserver, wasReady);
}
throw new UnsupportedOperationException("The bidirection streaming is "
UnsupportedOperationException ex = new UnsupportedOperationException("The bidirection streaming is "
+ "not supported for functions that accept non-Publisher but return Publisher: "
+ function);
// responseObserver.onError(Status.UNKNOWN.withDescription("Error handling request")
// .withCause(ex).asException());
responseObserver.onCompleted();
throw ex;
}
}

View File

@@ -39,7 +39,6 @@ import org.springframework.util.MimeTypeUtils;
import org.springframework.util.SocketUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
/**
*
@@ -225,13 +224,7 @@ public class GrpcInteractionTests {
Flux<Message<byte[]>> clientResponseObserver =
GrpcUtils.biStreaming("localhost", port, Flux.fromIterable(messages));
try {
clientResponseObserver.collectList().block(Duration.ofSeconds(1));
fail();
}
catch (Exception e) {
// TODO: handle exception
}
assertThat(clientResponseObserver.collectList().block(Duration.ofSeconds(2))).isEmpty();
}
}
@@ -256,13 +249,7 @@ public class GrpcInteractionTests {
Flux<Message<byte[]>> clientResponseObserver =
GrpcUtils.biStreaming("localhost", port, Flux.fromIterable(messages));
try {
clientResponseObserver.collectList().block(Duration.ofSeconds(1));
fail();
}
catch (Exception e) {
// TODO: handle exception
}
assertThat(clientResponseObserver.collectList().block(Duration.ofSeconds(2))).isEmpty();
}
}