diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java index 65e0460d5e..664e23c219 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java @@ -84,6 +84,8 @@ public class Reactor2TcpClient

implements TcpOperations

{ private final EventLoopGroup eventLoopGroup; + private final Environment environment; + private final TcpClientFactory, Message

> tcpClientSpecFactory; private final List, Message

>> tcpClients = @@ -108,12 +110,13 @@ public class Reactor2TcpClient

implements TcpOperations

{ // Reactor 2.0.5 requires NioEventLoopGroup vs 2.0.6+ requires EventLoopGroup final NioEventLoopGroup nioEventLoopGroup = initEventLoopGroup(); this.eventLoopGroup = nioEventLoopGroup; + this.environment = new Environment(new SynchronousDispatcherConfigReader()); this.tcpClientSpecFactory = new TcpClientFactory, Message

>() { @Override public TcpClientSpec, Message

> apply(TcpClientSpec, Message

> spec) { return spec - .env(new Environment(new SynchronousDispatcherConfigReader())) + .env(environment) .codec(codec) .connect(host, port) .options(createClientSocketOptions()); @@ -139,6 +142,7 @@ public class Reactor2TcpClient

implements TcpOperations

{ Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null"); this.tcpClientSpecFactory = tcpClientSpecFactory; this.eventLoopGroup = null; + this.environment = null; } @@ -269,6 +273,15 @@ public class Reactor2TcpClient

implements TcpOperations

{ promise = eventLoopPromise; } + if (this.environment != null) { + promise.onComplete(new Consumer>() { + @Override + public void accept(Promise voidPromise) { + environment.shutdown(); + } + }); + } + return new PassThroughPromiseToListenableFutureAdapter(promise); }