diff --git a/spring-cloud-function-adapters/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcUtils.java b/spring-cloud-function-adapters/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcUtils.java index d02b6aab6..b267e9c82 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcUtils.java +++ b/spring-cloud-function-adapters/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcUtils.java @@ -83,12 +83,16 @@ final class GrpcUtils { public static Message requestReply(String host, int port, Message inputMessage) { ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port) .usePlaintext().build(); - MessagingServiceGrpc.MessagingServiceBlockingStub stub = MessagingServiceGrpc - .newBlockingStub(channel); + try { + MessagingServiceGrpc.MessagingServiceBlockingStub stub = MessagingServiceGrpc + .newBlockingStub(channel); - GrpcSpringMessage response = stub.requestReply(toGrpcSpringMessage(inputMessage)); - channel.shutdown(); - return fromGrpcSpringMessage(response); + GrpcSpringMessage response = stub.requestReply(toGrpcSpringMessage(inputMessage)); + return fromGrpcSpringMessage(response); + } + finally { + channel.shutdownNow(); + } } /** @@ -129,7 +133,11 @@ final class GrpcUtils { return sink.asFlux().doOnComplete(() -> { logger.debug("Shutting down channel"); - channel.shutdown(); + channel.shutdownNow(); + }) + .doOnError(e -> { + e.printStackTrace(); + channel.shutdownNow(); }); } @@ -151,10 +159,14 @@ final class GrpcUtils { sink.tryEmitComplete(); }); - return sink.asFlux() .doOnComplete(() -> { - channel.shutdown(); + channel.shutdownNow(); + executor.shutdownNow(); + }) + .doOnError(e -> { + e.printStackTrace(); + channel.shutdownNow(); executor.shutdownNow(); }); } @@ -196,11 +208,13 @@ final class GrpcUtils { @Override public void onError(Throwable t) { t.printStackTrace(); + channel.shutdownNow(); } @Override public void onCompleted() { logger.info("Client completed"); + channel.shutdownNow(); } }; @@ -220,7 +234,11 @@ final class GrpcUtils { } }).doOnComplete(() -> { requestObserver.onCompleted(); - }).subscribe(); + }).doOnError(e -> { + e.printStackTrace(); + channel.shutdownNow(); + }) + .subscribe(); try { return resultRef.poll(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); @@ -229,6 +247,9 @@ final class GrpcUtils { Thread.currentThread().interrupt(); throw new IllegalStateException(ie); } + finally { + channel.shutdownNow(); + } } private static ClientResponseObserver clientResponseObserver(Flux> inputStream, Many> sink) { diff --git a/spring-cloud-function-adapters/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/MessageHandlingHelper.java b/spring-cloud-function-adapters/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/MessageHandlingHelper.java index 2016b7f10..462f59b74 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/MessageHandlingHelper.java +++ b/spring-cloud-function-adapters/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/MessageHandlingHelper.java @@ -185,18 +185,26 @@ public class MessageHandlingHelper implements Smar if (function.isOutputTypePublisher()) { return this.biStreamReactive(responseObserver, serverCallStreamObserver, grpcMessageType); } - throw new UnsupportedOperationException("The bi-directional streaming is " + UnsupportedOperationException ex = new UnsupportedOperationException("The bi-directional streaming is " + "not supported for functions that accept Publisher but return non-Publisher: " + function); +// responseObserver.onError(Status.UNKNOWN.withDescription("Error handling request") +// .withCause(ex).asException()); + responseObserver.onCompleted(); + throw ex; } else { if (!function.isOutputTypePublisher()) { return this.biStreamImperative(responseObserver, serverCallStreamObserver, wasReady); } - throw new UnsupportedOperationException("The bidirection streaming is " + + UnsupportedOperationException ex = new UnsupportedOperationException("The bidirection streaming is " + "not supported for functions that accept non-Publisher but return Publisher: " + function); - +// responseObserver.onError(Status.UNKNOWN.withDescription("Error handling request") +// .withCause(ex).asException()); + responseObserver.onCompleted(); + throw ex; } } diff --git a/spring-cloud-function-adapters/spring-cloud-function-grpc/src/test/java/org/springframework/cloud/function/grpc/GrpcInteractionTests.java b/spring-cloud-function-adapters/spring-cloud-function-grpc/src/test/java/org/springframework/cloud/function/grpc/GrpcInteractionTests.java index 0341afc5c..7be2a21e2 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-grpc/src/test/java/org/springframework/cloud/function/grpc/GrpcInteractionTests.java +++ b/spring-cloud-function-adapters/spring-cloud-function-grpc/src/test/java/org/springframework/cloud/function/grpc/GrpcInteractionTests.java @@ -39,7 +39,6 @@ import org.springframework.util.MimeTypeUtils; import org.springframework.util.SocketUtils; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.fail; /** * @@ -225,13 +224,7 @@ public class GrpcInteractionTests { Flux> clientResponseObserver = GrpcUtils.biStreaming("localhost", port, Flux.fromIterable(messages)); - try { - clientResponseObserver.collectList().block(Duration.ofSeconds(1)); - fail(); - } - catch (Exception e) { - // TODO: handle exception - } + assertThat(clientResponseObserver.collectList().block(Duration.ofSeconds(2))).isEmpty(); } } @@ -256,13 +249,7 @@ public class GrpcInteractionTests { Flux> clientResponseObserver = GrpcUtils.biStreaming("localhost", port, Flux.fromIterable(messages)); - try { - clientResponseObserver.collectList().block(Duration.ofSeconds(1)); - fail(); - } - catch (Exception e) { - // TODO: handle exception - } + assertThat(clientResponseObserver.collectList().block(Duration.ofSeconds(2))).isEmpty(); } }