Additional cleanup in gRPC module

This commit is contained in:
Oleg Zhurakousky
2021-10-19 08:07:06 +02:00
parent 03a1b4e98f
commit c8c0357786
2 changed files with 19 additions and 8 deletions

View File

@@ -17,6 +17,8 @@
package org.springframework.cloud.function.grpc;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -29,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
@@ -236,7 +239,9 @@ final class GrpcUtils {
requestObserver.onCompleted();
}).doOnError(e -> {
e.printStackTrace();
channel.shutdownNow();
responseObserver.onError(Status.UNKNOWN.withDescription("Error handling request")
.withCause(e).asRuntimeException());
// channel.shutdownNow();
})
.subscribe();
@@ -247,9 +252,9 @@ final class GrpcUtils {
Thread.currentThread().interrupt();
throw new IllegalStateException(ie);
}
finally {
channel.shutdownNow();
}
// finally {
// channel.shutdownNow();
// }
}
private static ClientResponseObserver<GrpcSpringMessage, GrpcSpringMessage> clientResponseObserver(Flux<Message<byte[]>> inputStream, Many<Message<byte[]>> sink) {

View File

@@ -147,7 +147,9 @@ public class MessageHandlingHelper<T extends GeneratedMessageV3> implements Smar
@Override
public void onError(Throwable t) {
t.printStackTrace();
responseObserver.onCompleted();
// responseObserver.onCompleted();
responseObserver.onError(Status.UNKNOWN.withDescription("Error handling request")
.withCause(t).asRuntimeException());
}
@Override
@@ -160,7 +162,9 @@ public class MessageHandlingHelper<T extends GeneratedMessageV3> implements Smar
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
responseObserver.onCompleted();
finally {
responseObserver.onCompleted();
}
}
};
}
@@ -232,7 +236,6 @@ public class MessageHandlingHelper<T extends GeneratedMessageV3> implements Smar
if (logger.isDebugEnabled()) {
logger.debug("gRPC Server receiving: " + inputMessage);
}
//GRPC_MESSAGE_TYPE = (Class<T>) inputMessage.getClass();
inputStream.tryEmitNext(toSpringMessage(inputMessage));
serverCallStreamObserver.request(1);
}
@@ -240,7 +243,10 @@ public class MessageHandlingHelper<T extends GeneratedMessageV3> implements Smar
@Override
public void onError(Throwable t) {
t.printStackTrace();
responseObserver.onCompleted();
//responseObserver.onCompleted();
inputStream.tryEmitComplete();
responseObserver.onError(Status.UNKNOWN.withDescription("Error handling request")
.withCause(t).asException());
}
@Override