diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 15ca26fa05..d9549ee933 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -50,7 +50,7 @@ import org.springframework.util.concurrent.ListenableFuture; /** * An implementation of {@link org.springframework.messaging.tcp.TcpOperations} * based on the TCP client support of the Reactor project. - *
+ * *
This implementation wraps N (Reactor) clients for N {@link #connect} calls, * i.e. a separate (Reactor) client instance for each connection. * @@ -78,7 +78,6 @@ public class ReactorNettyTcpClient
implements TcpOperations
{ * threads will be shared amongst the active clients. *
Also see the constructor accepting a {@link Consumer} of * {@link ClientOptions} for additional options. - * * @param host the host to connect to * @param port the port to connect to * @param codec for encoding and decoding messages @@ -88,17 +87,13 @@ public class ReactorNettyTcpClient
implements TcpOperations
{ } /** - * A constructor with a configurator {@link Consumer} that will receive - * default {@link ClientOptions} from {@link TcpClient}. This might be used - * to add SSL or specific network parameters to the generated client - * configuration. - * + * A constructor with a configurator {@link Consumer} that will receive default + * {@link ClientOptions} from {@link TcpClient}. This might be used to add SSL + * or specific network parameters to the generated client configuration. * @param tcpOptions callback for configuring shared {@link ClientOptions} * @param codec for encoding and decoding messages */ - public ReactorNettyTcpClient(Consumer super ClientOptions> tcpOptions, - ReactorNettyCodec
codec) { - + public ReactorNettyTcpClient(Consumer super ClientOptions> tcpOptions, ReactorNettyCodec
codec) { Assert.notNull(codec, "'codec' is required"); this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); this.tcpClient = TcpClient.create(opts -> tcpOptions.accept(opts.channelGroup(group))); @@ -165,14 +160,13 @@ public class ReactorNettyTcpClient
implements TcpOperations
{
this.stopping = true;
Mono
- implements BiFunction implements BiFunction connectionHandler;
@@ -180,10 +174,7 @@ public class ReactorNettyTcpClient implements TcpOperations {
private final Scheduler scheduler;
-
- MessageHandler(TcpConnectionHandler handler, ReactorNettyCodec codec,
- Scheduler scheduler) {
-
+ MessageHandler(TcpConnectionHandler handler, ReactorNettyCodec codec, Scheduler scheduler) {
this.connectionHandler = handler;
this.codec = codec;
this.scheduler = scheduler;
@@ -192,7 +183,6 @@ public class ReactorNettyTcpClient implements TcpOperations {
@Override
public Publisher tcpConnection =
@@ -209,14 +199,13 @@ public class ReactorNettyTcpClient implements TcpOperations {
return closeProcessor;
}
-
}
+
private static final class Reconnector