From ae62341fa3d0a9d8de5e0d9e2b8a8dc39fd1c4fa Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Thu, 22 Dec 2016 22:35:54 +0100 Subject: [PATCH] Avoid deprecated Scheduler.shutdown() in favor of Scheduler.dispose() --- .../tcp/reactor/ReactorNettyTcpClient.java | 29 ++++++------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 15ca26fa05..d9549ee933 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -50,7 +50,7 @@ import org.springframework.util.concurrent.ListenableFuture; /** * An implementation of {@link org.springframework.messaging.tcp.TcpOperations} * based on the TCP client support of the Reactor project. - *

+ * *

This implementation wraps N (Reactor) clients for N {@link #connect} calls, * i.e. a separate (Reactor) client instance for each connection. * @@ -78,7 +78,6 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ * threads will be shared amongst the active clients. *

Also see the constructor accepting a {@link Consumer} of * {@link ClientOptions} for additional options. - * * @param host the host to connect to * @param port the port to connect to * @param codec for encoding and decoding messages @@ -88,17 +87,13 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ } /** - * A constructor with a configurator {@link Consumer} that will receive - * default {@link ClientOptions} from {@link TcpClient}. This might be used - * to add SSL or specific network parameters to the generated client - * configuration. - * + * A constructor with a configurator {@link Consumer} that will receive default + * {@link ClientOptions} from {@link TcpClient}. This might be used to add SSL + * or specific network parameters to the generated client configuration. * @param tcpOptions callback for configuring shared {@link ClientOptions} * @param codec for encoding and decoding messages */ - public ReactorNettyTcpClient(Consumer tcpOptions, - ReactorNettyCodec

codec) { - + public ReactorNettyTcpClient(Consumer tcpOptions, ReactorNettyCodec

codec) { Assert.notNull(codec, "'codec' is required"); this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); this.tcpClient = TcpClient.create(opts -> tcpOptions.accept(opts.channelGroup(group))); @@ -165,14 +160,13 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ this.stopping = true; Mono completion = FutureMono.from(this.group.close()) - .doAfterTerminate((x, e) -> this.scheduler.shutdown()); + .doAfterTerminate((x, e) -> this.scheduler.dispose()); return new MonoToListenableFutureAdapter<>(completion); } - private static final class MessageHandler

- implements BiFunction> { + private static final class MessageHandler

implements BiFunction> { private final TcpConnectionHandler

connectionHandler; @@ -180,10 +174,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ private final Scheduler scheduler; - - MessageHandler(TcpConnectionHandler

handler, ReactorNettyCodec

codec, - Scheduler scheduler) { - + MessageHandler(TcpConnectionHandler

handler, ReactorNettyCodec

codec, Scheduler scheduler) { this.connectionHandler = handler; this.codec = codec; this.scheduler = scheduler; @@ -192,7 +183,6 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ @Override public Publisher apply(NettyInbound in, NettyOutbound out) { Flux>> inbound = in.receive().map(this.codec.getDecoder()); - DirectProcessor closeProcessor = DirectProcessor.create(); TcpConnection

tcpConnection = @@ -209,14 +199,13 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ return closeProcessor; } - } + private static final class Reconnector implements Function, Publisher> { private final ReconnectStrategy strategy; - Reconnector(ReconnectStrategy strategy) { this.strategy = strategy; }