From bc3a4ea7cac48861272e640a1befb31d06c60fc2 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Fri, 24 Sep 2021 16:02:30 +0200 Subject: [PATCH] GH-744 Make default wait time indefinite for client-side streaming when awaiting response --- .../cloud/function/grpc/GrpcServerMessageHandler.java | 2 +- .../org/springframework/cloud/function/grpc/GrpcUtils.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcServerMessageHandler.java b/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcServerMessageHandler.java index dc5960cd5..0b366666b 100644 --- a/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcServerMessageHandler.java +++ b/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcServerMessageHandler.java @@ -153,7 +153,7 @@ class GrpcServerMessageHandler extends MessagingServiceImplBase { logger.info("gRPC Server has finished receiving data."); inputStream.tryEmitComplete(); try { - responseObserver.onNext(GrpcUtils.toGrpcMessage(resultRef.poll(5000, TimeUnit.MILLISECONDS))); + responseObserver.onNext(GrpcUtils.toGrpcMessage(resultRef.poll(Integer.MAX_VALUE, TimeUnit.MILLISECONDS))); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcUtils.java b/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcUtils.java index f0d7367e0..3f2315588 100644 --- a/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcUtils.java +++ b/spring-cloud-function-grpc/src/main/java/org/springframework/cloud/function/grpc/GrpcUtils.java @@ -191,7 +191,7 @@ final class GrpcUtils { }).subscribe(); try { - return resultRef.poll(5000, TimeUnit.MILLISECONDS); + return resultRef.poll(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); @@ -215,7 +215,7 @@ final class GrpcUtils { inputStream .doOnNext(request -> { if (logger.isDebugEnabled()) { - logger.debug("Sending message: " + request); + logger.debug("Streaming message to function: " + request); } requestStreamObserver.onNext(GrpcUtils.toGrpcMessage(request)); }) @@ -230,7 +230,7 @@ final class GrpcUtils { @Override public void onNext(GrpcMessage message) { if (logger.isDebugEnabled()) { - logger.debug("Receiving message: " + message); + logger.debug("Streaming message from function: " + message); } sink.tryEmitNext(fromGrpcMessage(message)); requestStreamObserver.request(1);