From a3216432b5b86e1e42110af1738ea66bdd58b163 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 31 May 2018 15:35:53 -0400 Subject: [PATCH] Polish Issue: SPR-16387 --- build.gradle | 3 ++ spring-messaging/spring-messaging.gradle | 5 ++- .../tcp/reactor/ReactorNettyTcpClient.java | 29 +++++++++------- .../reactor/ReactorNettyTcpConnection.java | 6 ++-- spring-test/spring-test.gradle | 5 ++- spring-web/spring-web.gradle | 5 ++- .../client/reactive/ClientHttpConnector.java | 15 ++++----- .../reactive/ReactorClientHttpConnector.java | 30 ++++++++--------- .../reactive/ReactorClientHttpRequest.java | 33 ++++++++++--------- .../reactive/ReactorClientHttpResponse.java | 20 +++++------ .../reactive/bootstrap/ReactorHttpServer.java | 18 +++++----- .../bootstrap/ReactorHttpsServer.java | 19 +++++------ spring-webflux/spring-webflux.gradle | 5 ++- .../client/ReactorNettyWebSocketClient.java | 31 ++++++++--------- spring-websocket/spring-websocket.gradle | 5 ++- 15 files changed, 125 insertions(+), 104 deletions(-) diff --git a/build.gradle b/build.gradle index 9c443424f0..73b0d64559 100644 --- a/build.gradle +++ b/build.gradle @@ -247,6 +247,9 @@ configure(rootProject) { imports { mavenBom "io.projectreactor:reactor-bom:${reactorVersion}" } + dependencies { + dependency "io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT" + } resolutionStrategy { cacheChangingModulesFor 0, 'seconds' } diff --git a/spring-messaging/spring-messaging.gradle b/spring-messaging/spring-messaging.gradle index f1da8e7c5f..ecbdc12b1d 100644 --- a/spring-messaging/spring-messaging.gradle +++ b/spring-messaging/spring-messaging.gradle @@ -7,6 +7,9 @@ dependencyManagement { mavenBom "io.projectreactor:reactor-bom:${reactorVersion}" mavenBom "io.netty:netty-bom:${nettyVersion}" } + dependencies { + dependency "io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT" + } resolutionStrategy { cacheChangingModulesFor 0, 'seconds' } @@ -18,7 +21,7 @@ dependencies { compile(project(":spring-core")) optional(project(":spring-context")) optional(project(":spring-oxm")) - optional("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT") + optional("io.projectreactor.netty:reactor-netty") optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") { exclude group: "javax.servlet", module: "javax.servlet-api" } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 0277ebaa43..62e5981471 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -90,25 +90,30 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ /** - * Simple constructor with a host and a port. + * Simple constructor with the host and port to use to connect to. + *

This constructor manages the lifecycle of the {@link TcpClient} and + * underlying resources such as {@link ConnectionProvider}, + * {@link LoopResources}, and {@link ChannelGroup}. + *

For full control over the initialization and lifecycle of the + * TcpClient, use {@link #ReactorNettyTcpClient(TcpClient, ReactorNettyCodec)}. * @param host the host to connect to * @param port the port to connect to - * @param codec the code to use + * @param codec for encoding and decoding the input/output byte streams * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec */ public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec

codec) { Assert.notNull(host, "host is required"); - Assert.notNull(port, "port is required"); Assert.notNull(codec, "ReactorNettyCodec is required"); this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); this.loopResources = LoopResources.create("tcp-client-loop"); this.poolResources = ConnectionProvider.elastic("tcp-client-pool"); - this.tcpClient = TcpClient.create(poolResources) - .host(host) - .port(port) - .runOn(loopResources, false) - .doOnConnected(c -> channelGroup.add(c.channel())); + + this.tcpClient = TcpClient.create(this.poolResources) + .host(host).port(port) + .runOn(this.loopResources, false) + .doOnConnected(conn -> this.channelGroup.add(conn.channel())); + this.codec = codec; } @@ -117,7 +122,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ * lifecycle is expected to be managed externally. * * @param tcpClient the TcpClient instance to use - * @param codec the code to use + * @param codec for encoding and decoding the input/output byte streams * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec */ public ReactorNettyTcpClient(TcpClient tcpClient, ReactorNettyCodec

codec) { @@ -264,16 +269,16 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ @Override @SuppressWarnings("unchecked") public Publisher apply(NettyInbound inbound, NettyOutbound outbound) { - inbound.withConnection(c -> { + inbound.withConnection(conn -> { if (logger.isDebugEnabled()) { - logger.debug("Connected to " + c.address()); + logger.debug("Connected to " + conn.address()); } }); DirectProcessor completion = DirectProcessor.create(); TcpConnection

connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion); scheduler.schedule(() -> connectionHandler.afterConnected(connection)); - inbound.withConnection(c -> c.addHandler(new StompMessageDecoder<>(codec))); + inbound.withConnection(conn -> conn.addHandler(new StompMessageDecoder<>(codec))); inbound.receiveObject() .cast(Message.class) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java index e130bcefce..1ae9519867 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -64,13 +64,13 @@ public class ReactorNettyTcpConnection

implements TcpConnection

{ @Override @SuppressWarnings("deprecation") public void onReadInactivity(Runnable runnable, long inactivityDuration) { - this.inbound.withConnection(c -> c.onReadIdle(inactivityDuration, runnable)); + this.inbound.withConnection(conn -> conn.onReadIdle(inactivityDuration, runnable)); } @Override @SuppressWarnings("deprecation") public void onWriteInactivity(Runnable runnable, long inactivityDuration) { - this.inbound.withConnection(c -> c.onWriteIdle(inactivityDuration, runnable)); + this.inbound.withConnection(conn -> conn.onWriteIdle(inactivityDuration, runnable)); } @Override diff --git a/spring-test/spring-test.gradle b/spring-test/spring-test.gradle index 516e6c04fa..1f0824a736 100644 --- a/spring-test/spring-test.gradle +++ b/spring-test/spring-test.gradle @@ -7,6 +7,9 @@ dependencyManagement { mavenBom "io.projectreactor:reactor-bom:${reactorVersion}" mavenBom "io.netty:netty-bom:${nettyVersion}" } + dependencies { + dependency "io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT" + } resolutionStrategy { cacheChangingModulesFor 0, 'seconds' } @@ -80,7 +83,7 @@ dependencies { testCompile("org.apache.httpcomponents:httpclient:4.5.5") { exclude group: "commons-logging", module: "commons-logging" } - testCompile('io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT') + testCompile('io.projectreactor.netty:reactor-netty') testCompile('de.bechte.junit:junit-hierarchicalcontextrunner:4.12.1') // Pull in the latest JUnit 5 Launcher API and the Vintage engine as well // so that we can run JUnit 4 tests in IntelliJ IDEA. diff --git a/spring-web/spring-web.gradle b/spring-web/spring-web.gradle index f2162a2984..280f6cbe3f 100644 --- a/spring-web/spring-web.gradle +++ b/spring-web/spring-web.gradle @@ -8,6 +8,9 @@ dependencyManagement { mavenBom "io.projectreactor:reactor-bom:${reactorVersion}" mavenBom "io.netty:netty-bom:${nettyVersion}" } + dependencies { + dependency "io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT" + } resolutionStrategy { cacheChangingModulesFor 0, 'seconds' } @@ -34,7 +37,7 @@ dependencies { optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}") optional("io.reactivex.rxjava2:rxjava:${rxjava2Version}") optional("io.netty:netty-all") - optional("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT") + optional("io.projectreactor.netty:reactor-netty") optional("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}") optional("org.eclipse.jetty:jetty-server:${jettyVersion}") { exclude group: "javax.servlet", module: "javax.servlet-api" diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpConnector.java index 59260e81bf..4f437bcfea 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpConnector.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,16 +35,15 @@ public interface ClientHttpConnector { /** * Connect to the origin server using the given {@code HttpMethod} and - * {@code URI}, then apply the given {@code requestCallback} on the - * {@link ClientHttpRequest} once the connection has been established. - *

Return a publisher of the {@link ClientHttpResponse}. + * {@code URI} and apply the given {@code requestCallback} when the HTTP + * request of the underlying API can be initialized and written to. * @param method the HTTP request method * @param uri the HTTP request URI - * @param requestCallback a function that prepares and writes the request, - * returning a publisher that signals when it's done interacting with the - * request. Implementations should return a {@code Mono} by calling + * @param requestCallback a function that prepares and writes to the request, + * returning a publisher that signals when it's done writing. + * Implementations can return a {@code Mono} by calling * {@link ClientHttpRequest#writeWith} or {@link ClientHttpRequest#setComplete}. - * @return a publisher of the {@link ClientHttpResponse} + * @return publisher for the {@link ClientHttpResponse} */ Mono connect(HttpMethod method, URI uri, Function> requestCallback); diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java index ab21605ee9..042376e43c 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,16 +44,17 @@ public class ReactorClientHttpConnector implements ClientHttpConnector { /** * Create a Reactor Netty {@link ClientHttpConnector} - * with a default configuration and HTTP compression support enabled. + * with default configuration and HTTP compression support enabled. */ public ReactorClientHttpConnector() { - this.httpClient = HttpClient.create() - .compress(); + this.httpClient = HttpClient.create().compress(); } /** - * Create a Reactor Netty {@link ClientHttpConnector} with the given - * {@link HttpClient} + * Create a Reactor Netty {@link ClientHttpConnector} with a fully + * configured {@code HttpClient}. + * @param httpClient the client instance to use + * @since 5.1 */ public ReactorClientHttpConnector(HttpClient httpClient) { this.httpClient = httpClient; @@ -69,24 +70,23 @@ public class ReactorClientHttpConnector implements ClientHttpConnector { } return this.httpClient - .request(adaptHttpMethod(method)) + .request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name())) .uri(uri.toString()) - .send((req, out) -> requestCallback.apply(adaptRequest(method, uri, req, out))) + .send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound))) .responseConnection((res, con) -> Mono.just(adaptResponse(res, con.inbound(), con.outbound().alloc()))) .next(); } - private io.netty.handler.codec.http.HttpMethod adaptHttpMethod(HttpMethod method) { - return io.netty.handler.codec.http.HttpMethod.valueOf(method.name()); - } + private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request, + NettyOutbound nettyOutbound) { - private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request, NettyOutbound out) { - return new ReactorClientHttpRequest(method, uri, request, out); + return new ReactorClientHttpRequest(method, uri, request, nettyOutbound); } private ClientHttpResponse adaptResponse(HttpClientResponse response, NettyInbound nettyInbound, - ByteBufAllocator alloc) { - return new ReactorClientHttpResponse(response, nettyInbound, alloc); + ByteBufAllocator allocator) { + + return new ReactorClientHttpResponse(response, nettyInbound, allocator); } } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java index 349447a622..a74a1033d5 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,20 +47,19 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero private final URI uri; - private final HttpClientRequest httpRequest; + private final HttpClientRequest request; - private final NettyOutbound out; + private final NettyOutbound outbound; private final NettyDataBufferFactory bufferFactory; - public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri, - HttpClientRequest httpRequest, NettyOutbound out) { - this.httpMethod = httpMethod; + public ReactorClientHttpRequest(HttpMethod method, URI uri, HttpClientRequest request, NettyOutbound outbound) { + this.httpMethod = method; this.uri = uri; - this.httpRequest = httpRequest; - this.out = out; - this.bufferFactory = new NettyDataBufferFactory(out.alloc()); + this.request = request; + this.outbound = outbound; + this.bufferFactory = new NettyDataBufferFactory(outbound.alloc()); } @@ -81,14 +80,16 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero @Override public Mono writeWith(Publisher body) { - return doCommit(() -> this.out - .send(Flux.from(body).map(NettyDataBufferFactory::toByteBuf)).then()); + return doCommit(() -> { + Flux byteBufFlux = Flux.from(body).map(NettyDataBufferFactory::toByteBuf); + return this.outbound.send(byteBufFlux).then(); + }); } @Override public Mono writeAndFlushWith(Publisher> body) { Publisher> byteBufs = Flux.from(body).map(ReactorClientHttpRequest::toByteBufs); - return doCommit(() -> this.out.sendGroups(byteBufs).then()); + return doCommit(() -> this.outbound.sendGroups(byteBufs).then()); } private static Publisher toByteBufs(Publisher dataBuffers) { @@ -97,24 +98,24 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero @Override public Mono writeWith(File file, long position, long count) { - return doCommit(() -> this.out.sendFile(file.toPath(), position, count).then()); + return doCommit(() -> this.outbound.sendFile(file.toPath(), position, count).then()); } @Override public Mono setComplete() { - return doCommit(() -> out.then()); + return doCommit(this.outbound::then); } @Override protected void applyHeaders() { - getHeaders().entrySet().forEach(e -> this.httpRequest.requestHeaders().set(e.getKey(), e.getValue())); + getHeaders().forEach((key, value) -> this.request.requestHeaders().set(key, value)); } @Override protected void applyCookies() { getCookies().values().stream().flatMap(Collection::stream) .map(cookie -> new DefaultCookie(cookie.getName(), cookie.getValue())) - .forEach(this.httpRequest::addCookie); + .forEach(this.request::addCookie); } } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java index 9e7b4df5b3..dc5a10a89f 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java @@ -42,28 +42,26 @@ import io.netty.buffer.ByteBufAllocator; */ class ReactorClientHttpResponse implements ClientHttpResponse { - private final NettyDataBufferFactory dataBufferFactory; + private final NettyDataBufferFactory bufferFactory; private final HttpClientResponse response; - private final NettyInbound nettyInbound; + private final NettyInbound inbound; - public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound nettyInbound, - ByteBufAllocator alloc) { + public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbound, ByteBufAllocator alloc) { this.response = response; - this.nettyInbound = nettyInbound; - this.dataBufferFactory = new NettyDataBufferFactory(alloc); + this.inbound = inbound; + this.bufferFactory = new NettyDataBufferFactory(alloc); } @Override public Flux getBody() { - return nettyInbound - .receive() - .map(buf -> { - buf.retain(); - return dataBufferFactory.wrap(buf); + return this.inbound.receive() + .map(byteBuf -> { + byteBuf.retain(); + return this.bufferFactory.wrap(byteBuf); }); } diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java index dba2f4c56a..a688ca7d5f 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,14 +31,14 @@ public class ReactorHttpServer extends AbstractHttpServer { private reactor.netty.http.server.HttpServer reactorServer; - private AtomicReference disposableServer = new AtomicReference<>(); + private AtomicReference serverRef = new AtomicReference<>(); @Override - protected void initServer() throws Exception { + protected void initServer() { this.reactorHandler = createHttpHandlerAdapter(); this.reactorServer = reactor.netty.http.server.HttpServer.create() - .tcpConfiguration(tcpServer -> tcpServer.host(getHost())) + .tcpConfiguration(server -> server.host(getHost())) .port(getPort()); } @@ -48,21 +48,21 @@ public class ReactorHttpServer extends AbstractHttpServer { @Override protected void startInternal() { - DisposableServer disposableServer = this.reactorServer.handle(this.reactorHandler).bind().block(); - setPort(disposableServer.address().getPort()); - this.disposableServer.set(disposableServer); + DisposableServer server = this.reactorServer.handle(this.reactorHandler).bind().block(); + setPort(server.address().getPort()); + this.serverRef.set(server); } @Override protected void stopInternal() { - this.disposableServer.get().dispose(); + this.serverRef.get().dispose(); } @Override protected void resetInternal() { this.reactorServer = null; this.reactorHandler = null; - this.disposableServer.set(null); + this.serverRef.set(null); } } diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpsServer.java b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpsServer.java index 8717440ccb..8195165bfa 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpsServer.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpsServer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,15 +31,14 @@ public class ReactorHttpsServer extends AbstractHttpServer { private reactor.netty.http.server.HttpServer reactorServer; - private AtomicReference disposableServer = new AtomicReference<>(); + private AtomicReference serverRef = new AtomicReference<>(); @Override - protected void initServer() throws Exception { + protected void initServer() { this.reactorHandler = createHttpHandlerAdapter(); this.reactorServer = reactor.netty.http.server.HttpServer.create() - .tcpConfiguration(tcpServer -> tcpServer.host(getHost()) - .secure()) + .tcpConfiguration(server -> server.host(getHost()).secure()) .port(getPort()); } @@ -49,21 +48,21 @@ public class ReactorHttpsServer extends AbstractHttpServer { @Override protected void startInternal() { - DisposableServer disposableServer = this.reactorServer.handle(this.reactorHandler).bind().block(); - setPort(disposableServer.address().getPort()); - this.disposableServer.set(disposableServer); + DisposableServer server = this.reactorServer.handle(this.reactorHandler).bind().block(); + setPort(server.address().getPort()); + this.serverRef.set(server); } @Override protected void stopInternal() { - this.disposableServer.get().dispose(); + this.serverRef.get().dispose(); } @Override protected void resetInternal() { this.reactorServer = null; this.reactorHandler = null; - this.disposableServer.set(null); + this.serverRef.set(null); } } diff --git a/spring-webflux/spring-webflux.gradle b/spring-webflux/spring-webflux.gradle index c3697b994a..9e9e99e126 100644 --- a/spring-webflux/spring-webflux.gradle +++ b/spring-webflux/spring-webflux.gradle @@ -7,6 +7,9 @@ dependencyManagement { mavenBom "io.projectreactor:reactor-bom:${reactorVersion}" mavenBom "io.netty:netty-bom:${nettyVersion}" } + dependencies { + dependency "io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT" + } resolutionStrategy { cacheChangingModulesFor 0, 'seconds' } @@ -28,7 +31,7 @@ dependencies { optional("com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${jackson2Version}") optional("io.reactivex:rxjava:${rxjavaVersion}") optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}") - optional("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT") + optional("io.projectreactor.netty:reactor-netty") optional("org.apache.tomcat:tomcat-websocket:${tomcatVersion}") { exclude group: "org.apache.tomcat", module: "tomcat-websocket-api" exclude group: "org.apache.tomcat", module: "tomcat-servlet-api" diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java index 0699e2cebe..55591f0c50 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,11 +17,11 @@ package org.springframework.web.reactive.socket.client; import java.net.URI; import java.util.List; -import java.util.function.Consumer; import io.netty.buffer.ByteBufAllocator; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; +import reactor.netty.http.websocket.WebsocketInbound; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpHeaders; @@ -51,6 +51,7 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen /** * Constructor that accepts an existing {@link HttpClient} builder. + * @since 5.1 */ public ReactorNettyWebSocketClient(HttpClient httpClient) { this.httpClient = httpClient; @@ -71,32 +72,32 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen } @Override - public Mono execute(URI url, HttpHeaders headers, WebSocketHandler handler) { - List protocols = beforeHandshake(url, headers, handler); + public Mono execute(URI url, HttpHeaders httpHeaders, WebSocketHandler handler) { + List protocols = beforeHandshake(url, httpHeaders, handler); return getHttpClient() - .headers(nettyHeaders -> setNettyHeaders(headers, nettyHeaders)) + .headers(nettyHeaders -> setNettyHeaders(httpHeaders, nettyHeaders)) .websocket(StringUtils.collectionToCommaDelimitedString(protocols)) .uri(url.toString()) - .handle((in, out) -> { - HandshakeInfo info = afterHandshake(url, toHttpHeaders(in.headers())); - ByteBufAllocator allocator = out.alloc(); - NettyDataBufferFactory factory = new NettyDataBufferFactory(allocator); - WebSocketSession session = new ReactorNettyWebSocketSession(in, out, info, factory); + .handle((inbound, outbound) -> { + HandshakeInfo info = afterHandshake(url, toHttpHeaders(inbound)); + NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc()); + WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory); return handler.handle(session); }) .next(); } - private void setNettyHeaders(HttpHeaders headers, io.netty.handler.codec.http.HttpHeaders nettyHeaders) { - headers.forEach(nettyHeaders::set); + private void setNettyHeaders(HttpHeaders httpHeaders, io.netty.handler.codec.http.HttpHeaders nettyHeaders) { + httpHeaders.forEach(nettyHeaders::set); } - private HttpHeaders toHttpHeaders(io.netty.handler.codec.http.HttpHeaders responseHeaders) { + private HttpHeaders toHttpHeaders(WebsocketInbound inbound) { HttpHeaders headers = new HttpHeaders(); - responseHeaders.forEach(entry -> { + io.netty.handler.codec.http.HttpHeaders nettyHeaders = inbound.headers(); + nettyHeaders.forEach(entry -> { String name = entry.getKey(); - headers.put(name, responseHeaders.getAll(name)); + headers.put(name, nettyHeaders.getAll(name)); }); return headers; } diff --git a/spring-websocket/spring-websocket.gradle b/spring-websocket/spring-websocket.gradle index bba796f188..8a3a384ab1 100644 --- a/spring-websocket/spring-websocket.gradle +++ b/spring-websocket/spring-websocket.gradle @@ -7,6 +7,9 @@ dependencyManagement { mavenBom "io.projectreactor:reactor-bom:${reactorVersion}" mavenBom "io.netty:netty-bom:${nettyVersion}" } + dependencies { + dependency "io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT" + } resolutionStrategy { cacheChangingModulesFor 0, 'seconds' } @@ -44,5 +47,5 @@ dependencies { optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}") testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}") testCompile("org.apache.tomcat.embed:tomcat-embed-websocket:${tomcatVersion}") - testCompile("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT") + testCompile("io.projectreactor.netty:reactor-netty") }