From ab94368d18aa4ac8289de3b879136e28c0205a03 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Tue, 19 Oct 2021 08:07:06 +0200 Subject: [PATCH] Additional cleanup in gRPC module --- .../cloud/function/grpc/GrpcUtils.java | 13 +++++++++---- .../cloud/function/grpc/MessageHandlingHelper.java | 14 ++++++++++---- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/spring-cloud-function-adapters/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcUtils.java b/spring-cloud-function-adapters/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcUtils.java index b267e9c82..8e12836a7 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcUtils.java +++ b/spring-cloud-function-adapters/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcUtils.java @@ -17,6 +17,8 @@ package org.springframework.cloud.function.grpc; +import java.io.Closeable; +import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -29,6 +31,7 @@ import java.util.concurrent.TimeUnit; import com.google.protobuf.ByteString; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.grpc.Status; import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.StreamObserver; @@ -236,7 +239,9 @@ final class GrpcUtils { requestObserver.onCompleted(); }).doOnError(e -> { e.printStackTrace(); - channel.shutdownNow(); + responseObserver.onError(Status.UNKNOWN.withDescription("Error handling request") + .withCause(e).asRuntimeException()); +// channel.shutdownNow(); }) .subscribe(); @@ -247,9 +252,9 @@ final class GrpcUtils { Thread.currentThread().interrupt(); throw new IllegalStateException(ie); } - finally { - channel.shutdownNow(); - } +// finally { +// channel.shutdownNow(); +// } } private static ClientResponseObserver clientResponseObserver(Flux> inputStream, Many> sink) { diff --git a/spring-cloud-function-adapters/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/MessageHandlingHelper.java b/spring-cloud-function-adapters/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/MessageHandlingHelper.java index 462f59b74..deb1fd58b 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/MessageHandlingHelper.java +++ b/spring-cloud-function-adapters/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/MessageHandlingHelper.java @@ -147,7 +147,9 @@ public class MessageHandlingHelper implements Smar @Override public void onError(Throwable t) { t.printStackTrace(); - responseObserver.onCompleted(); +// responseObserver.onCompleted(); + responseObserver.onError(Status.UNKNOWN.withDescription("Error handling request") + .withCause(t).asRuntimeException()); } @Override @@ -160,7 +162,9 @@ public class MessageHandlingHelper implements Smar catch (InterruptedException e) { Thread.currentThread().interrupt(); } - responseObserver.onCompleted(); + finally { + responseObserver.onCompleted(); + } } }; } @@ -232,7 +236,6 @@ public class MessageHandlingHelper implements Smar if (logger.isDebugEnabled()) { logger.debug("gRPC Server receiving: " + inputMessage); } - //GRPC_MESSAGE_TYPE = (Class) inputMessage.getClass(); inputStream.tryEmitNext(toSpringMessage(inputMessage)); serverCallStreamObserver.request(1); } @@ -240,7 +243,10 @@ public class MessageHandlingHelper implements Smar @Override public void onError(Throwable t) { t.printStackTrace(); - responseObserver.onCompleted(); + //responseObserver.onCompleted(); + inputStream.tryEmitComplete(); + responseObserver.onError(Status.UNKNOWN.withDescription("Error handling request") + .withCause(t).asException()); } @Override