From 8d316f906c2236737d7eca403faaf54bff2df0d1 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 27 Jul 2020 16:44:39 -0400 Subject: [PATCH] Improve RSocketForwardingFunction * Use `Mono` for lazy connection on target subscription returned from the `RSocketForwardingFunction` * Propagate `retry` into an `RSocketConnector` Resolves #566 --- .../rsocket/RSocketForwardingFunction.java | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketForwardingFunction.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketForwardingFunction.java index cb76b93ba..489d3253d 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketForwardingFunction.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketForwardingFunction.java @@ -29,6 +29,7 @@ import io.rsocket.util.DefaultPayload; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; import reactor.util.retry.Retry; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; @@ -36,41 +37,44 @@ import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; - /** * * @author Oleg Zhurakousky + * @author Artem Bilan + * * @since 3.1 * */ class RSocketForwardingFunction implements Function, Publisher>> { - private static Log logger = LogFactory.getLog(RSocketForwardingFunction.class); - private final RSocket rSocket; + private static final Log LOGGER = LogFactory.getLog(RSocketForwardingFunction.class); + + private final Mono rsocketMono; private final FunctionInvocationWrapper targetFunction; RSocketForwardingFunction(FunctionInvocationWrapper targetFunction, InetSocketAddress outputAddress) { this.targetFunction = targetFunction; - this.rSocket = outputAddress == null ? null - : RSocketConnector.connectWith(TcpClientTransport.create(outputAddress)) - .log() - .retryWhen(Retry.backoff(5, Duration.ofSeconds(1))) - .block(); + this.rsocketMono = + outputAddress == null + ? null + : RSocketConnector.create() + .reconnect(Retry.backoff(5, Duration.ofSeconds(1))) + .connect(TcpClientTransport.create(outputAddress)); } @SuppressWarnings("unchecked") @Override public Publisher> apply(Message input) { - if (logger.isDebugEnabled()) { - logger.debug("Executiing: " + this.targetFunction); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Executing: " + this.targetFunction); } Object rawResult = this.targetFunction.apply(input); - Publisher> resultMessage = this.rSocket - .requestStream(DefaultPayload.create(((Message) rawResult).getPayload())) - .map(this::buildResultMessage); - return resultMessage; + return this.rsocketMono + .flatMapMany((rsocket) -> + rsocket.requestStream(DefaultPayload.create(((Message) rawResult).getPayload()))) + .map(this::buildResultMessage); } private Message buildResultMessage(Payload payload) { @@ -79,4 +83,5 @@ class RSocketForwardingFunction implements Function, Publisher