Improve RSocketForwardingFunction

* Use `Mono<RSocket>` for lazy connection on target subscription
returned from the `RSocketForwardingFunction`
* Propagate `retry` into an `RSocketConnector`

Resolves #566
This commit is contained in:
Artem Bilan
2020-07-27 16:44:39 -04:00
committed by Oleg Zhurakousky
parent 6ca9c2f072
commit 8d316f906c

View File

@@ -29,6 +29,7 @@ import io.rsocket.util.DefaultPayload;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
@@ -36,41 +37,44 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
/**
*
* @author Oleg Zhurakousky
* @author Artem Bilan
*
* @since 3.1
*
*/
class RSocketForwardingFunction implements Function<Message<byte[]>, Publisher<Message<byte[]>>> {
private static Log logger = LogFactory.getLog(RSocketForwardingFunction.class);
private final RSocket rSocket;
private static final Log LOGGER = LogFactory.getLog(RSocketForwardingFunction.class);
private final Mono<RSocket> rsocketMono;
private final FunctionInvocationWrapper targetFunction;
RSocketForwardingFunction(FunctionInvocationWrapper targetFunction, InetSocketAddress outputAddress) {
this.targetFunction = targetFunction;
this.rSocket = outputAddress == null ? null
: RSocketConnector.connectWith(TcpClientTransport.create(outputAddress))
.log()
.retryWhen(Retry.backoff(5, Duration.ofSeconds(1)))
.block();
this.rsocketMono =
outputAddress == null
? null
: RSocketConnector.create()
.reconnect(Retry.backoff(5, Duration.ofSeconds(1)))
.connect(TcpClientTransport.create(outputAddress));
}
@SuppressWarnings("unchecked")
@Override
public Publisher<Message<byte[]>> apply(Message<byte[]> input) {
if (logger.isDebugEnabled()) {
logger.debug("Executiing: " + this.targetFunction);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Executing: " + this.targetFunction);
}
Object rawResult = this.targetFunction.apply(input);
Publisher<Message<byte[]>> resultMessage = this.rSocket
.requestStream(DefaultPayload.create(((Message<byte[]>) rawResult).getPayload()))
.map(this::buildResultMessage);
return resultMessage;
return this.rsocketMono
.flatMapMany((rsocket) ->
rsocket.requestStream(DefaultPayload.create(((Message<byte[]>) rawResult).getPayload())))
.map(this::buildResultMessage);
}
private Message<byte[]> buildResultMessage(Payload payload) {
@@ -79,4 +83,5 @@ class RSocketForwardingFunction implements Function<Message<byte[]>, Publisher<M
payloadBuffer.get(payloadData);
return MessageBuilder.withPayload(payloadData).build();
}
}