Additional cleanup in gRPC module
This commit is contained in:
@@ -17,6 +17,8 @@
|
||||
package org.springframework.cloud.function.grpc;
|
||||
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
@@ -29,6 +31,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import com.google.protobuf.ByteString;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.stub.ClientCallStreamObserver;
|
||||
import io.grpc.stub.ClientResponseObserver;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
@@ -236,7 +239,9 @@ final class GrpcUtils {
|
||||
requestObserver.onCompleted();
|
||||
}).doOnError(e -> {
|
||||
e.printStackTrace();
|
||||
channel.shutdownNow();
|
||||
responseObserver.onError(Status.UNKNOWN.withDescription("Error handling request")
|
||||
.withCause(e).asRuntimeException());
|
||||
// channel.shutdownNow();
|
||||
})
|
||||
.subscribe();
|
||||
|
||||
@@ -247,9 +252,9 @@ final class GrpcUtils {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IllegalStateException(ie);
|
||||
}
|
||||
finally {
|
||||
channel.shutdownNow();
|
||||
}
|
||||
// finally {
|
||||
// channel.shutdownNow();
|
||||
// }
|
||||
}
|
||||
|
||||
private static ClientResponseObserver<GrpcSpringMessage, GrpcSpringMessage> clientResponseObserver(Flux<Message<byte[]>> inputStream, Many<Message<byte[]>> sink) {
|
||||
|
||||
@@ -147,7 +147,9 @@ public class MessageHandlingHelper<T extends GeneratedMessageV3> implements Smar
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
t.printStackTrace();
|
||||
responseObserver.onCompleted();
|
||||
// responseObserver.onCompleted();
|
||||
responseObserver.onError(Status.UNKNOWN.withDescription("Error handling request")
|
||||
.withCause(t).asRuntimeException());
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -160,7 +162,9 @@ public class MessageHandlingHelper<T extends GeneratedMessageV3> implements Smar
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
responseObserver.onCompleted();
|
||||
finally {
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -232,7 +236,6 @@ public class MessageHandlingHelper<T extends GeneratedMessageV3> implements Smar
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("gRPC Server receiving: " + inputMessage);
|
||||
}
|
||||
//GRPC_MESSAGE_TYPE = (Class<T>) inputMessage.getClass();
|
||||
inputStream.tryEmitNext(toSpringMessage(inputMessage));
|
||||
serverCallStreamObserver.request(1);
|
||||
}
|
||||
@@ -240,7 +243,10 @@ public class MessageHandlingHelper<T extends GeneratedMessageV3> implements Smar
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
t.printStackTrace();
|
||||
responseObserver.onCompleted();
|
||||
//responseObserver.onCompleted();
|
||||
inputStream.tryEmitComplete();
|
||||
responseObserver.onError(Status.UNKNOWN.withDescription("Error handling request")
|
||||
.withCause(t).asException());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user