From bd40a93604bbff9e41085639628114fc0c0898c2 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 28 Apr 2016 17:46:46 -0400 Subject: [PATCH] Ensure Environment.shutdown() in Reactor2TcpClient Issue: SPR-14229 --- .../messaging/tcp/reactor/Reactor2TcpClient.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java index 65e0460d5e..664e23c219 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java @@ -84,6 +84,8 @@ public class Reactor2TcpClient

implements TcpOperations

{ private final EventLoopGroup eventLoopGroup; + private final Environment environment; + private final TcpClientFactory, Message

> tcpClientSpecFactory; private final List, Message

>> tcpClients = @@ -108,12 +110,13 @@ public class Reactor2TcpClient

implements TcpOperations

{ // Reactor 2.0.5 requires NioEventLoopGroup vs 2.0.6+ requires EventLoopGroup final NioEventLoopGroup nioEventLoopGroup = initEventLoopGroup(); this.eventLoopGroup = nioEventLoopGroup; + this.environment = new Environment(new SynchronousDispatcherConfigReader()); this.tcpClientSpecFactory = new TcpClientFactory, Message

>() { @Override public TcpClientSpec, Message

> apply(TcpClientSpec, Message

> spec) { return spec - .env(new Environment(new SynchronousDispatcherConfigReader())) + .env(environment) .codec(codec) .connect(host, port) .options(createClientSocketOptions()); @@ -139,6 +142,7 @@ public class Reactor2TcpClient

implements TcpOperations

{ Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null"); this.tcpClientSpecFactory = tcpClientSpecFactory; this.eventLoopGroup = null; + this.environment = null; } @@ -269,6 +273,15 @@ public class Reactor2TcpClient

implements TcpOperations

{ promise = eventLoopPromise; } + if (this.environment != null) { + promise.onComplete(new Consumer>() { + @Override + public void accept(Promise voidPromise) { + environment.shutdown(); + } + }); + } + return new PassThroughPromiseToListenableFutureAdapter(promise); }