diff --git a/build.gradle b/build.gradle
index 9c443424f0..73b0d64559 100644
--- a/build.gradle
+++ b/build.gradle
@@ -247,6 +247,9 @@ configure(rootProject) {
imports {
mavenBom "io.projectreactor:reactor-bom:${reactorVersion}"
}
+ dependencies {
+ dependency "io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT"
+ }
resolutionStrategy {
cacheChangingModulesFor 0, 'seconds'
}
diff --git a/spring-messaging/spring-messaging.gradle b/spring-messaging/spring-messaging.gradle
index f1da8e7c5f..ecbdc12b1d 100644
--- a/spring-messaging/spring-messaging.gradle
+++ b/spring-messaging/spring-messaging.gradle
@@ -7,6 +7,9 @@ dependencyManagement {
mavenBom "io.projectreactor:reactor-bom:${reactorVersion}"
mavenBom "io.netty:netty-bom:${nettyVersion}"
}
+ dependencies {
+ dependency "io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT"
+ }
resolutionStrategy {
cacheChangingModulesFor 0, 'seconds'
}
@@ -18,7 +21,7 @@ dependencies {
compile(project(":spring-core"))
optional(project(":spring-context"))
optional(project(":spring-oxm"))
- optional("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT")
+ optional("io.projectreactor.netty:reactor-netty")
optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") {
exclude group: "javax.servlet", module: "javax.servlet-api"
}
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java
index 0277ebaa43..62e5981471 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java
@@ -90,25 +90,30 @@ public class ReactorNettyTcpClient
implements TcpOperations
{
/**
- * Simple constructor with a host and a port.
+ * Simple constructor with the host and port to use to connect to.
+ *
This constructor manages the lifecycle of the {@link TcpClient} and
+ * underlying resources such as {@link ConnectionProvider},
+ * {@link LoopResources}, and {@link ChannelGroup}.
+ *
For full control over the initialization and lifecycle of the
+ * TcpClient, use {@link #ReactorNettyTcpClient(TcpClient, ReactorNettyCodec)}.
* @param host the host to connect to
* @param port the port to connect to
- * @param codec the code to use
+ * @param codec for encoding and decoding the input/output byte streams
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
*/
public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec
codec) {
Assert.notNull(host, "host is required");
- Assert.notNull(port, "port is required");
Assert.notNull(codec, "ReactorNettyCodec is required");
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.loopResources = LoopResources.create("tcp-client-loop");
this.poolResources = ConnectionProvider.elastic("tcp-client-pool");
- this.tcpClient = TcpClient.create(poolResources)
- .host(host)
- .port(port)
- .runOn(loopResources, false)
- .doOnConnected(c -> channelGroup.add(c.channel()));
+
+ this.tcpClient = TcpClient.create(this.poolResources)
+ .host(host).port(port)
+ .runOn(this.loopResources, false)
+ .doOnConnected(conn -> this.channelGroup.add(conn.channel()));
+
this.codec = codec;
}
@@ -117,7 +122,7 @@ public class ReactorNettyTcpClient
implements TcpOperations
{
* lifecycle is expected to be managed externally.
*
* @param tcpClient the TcpClient instance to use
- * @param codec the code to use
+ * @param codec for encoding and decoding the input/output byte streams
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
*/
public ReactorNettyTcpClient(TcpClient tcpClient, ReactorNettyCodec
codec) {
@@ -264,16 +269,16 @@ public class ReactorNettyTcpClient
implements TcpOperations
{
@Override
@SuppressWarnings("unchecked")
public Publisher apply(NettyInbound inbound, NettyOutbound outbound) {
- inbound.withConnection(c -> {
+ inbound.withConnection(conn -> {
if (logger.isDebugEnabled()) {
- logger.debug("Connected to " + c.address());
+ logger.debug("Connected to " + conn.address());
}
});
DirectProcessor completion = DirectProcessor.create();
TcpConnection connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
scheduler.schedule(() -> connectionHandler.afterConnected(connection));
- inbound.withConnection(c -> c.addHandler(new StompMessageDecoder<>(codec)));
+ inbound.withConnection(conn -> conn.addHandler(new StompMessageDecoder<>(codec)));
inbound.receiveObject()
.cast(Message.class)
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java
index e130bcefce..1ae9519867 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2017 the original author or authors.
+ * Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -64,13 +64,13 @@ public class ReactorNettyTcpConnection
implements TcpConnection
{
@Override
@SuppressWarnings("deprecation")
public void onReadInactivity(Runnable runnable, long inactivityDuration) {
- this.inbound.withConnection(c -> c.onReadIdle(inactivityDuration, runnable));
+ this.inbound.withConnection(conn -> conn.onReadIdle(inactivityDuration, runnable));
}
@Override
@SuppressWarnings("deprecation")
public void onWriteInactivity(Runnable runnable, long inactivityDuration) {
- this.inbound.withConnection(c -> c.onWriteIdle(inactivityDuration, runnable));
+ this.inbound.withConnection(conn -> conn.onWriteIdle(inactivityDuration, runnable));
}
@Override
diff --git a/spring-test/spring-test.gradle b/spring-test/spring-test.gradle
index 516e6c04fa..1f0824a736 100644
--- a/spring-test/spring-test.gradle
+++ b/spring-test/spring-test.gradle
@@ -7,6 +7,9 @@ dependencyManagement {
mavenBom "io.projectreactor:reactor-bom:${reactorVersion}"
mavenBom "io.netty:netty-bom:${nettyVersion}"
}
+ dependencies {
+ dependency "io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT"
+ }
resolutionStrategy {
cacheChangingModulesFor 0, 'seconds'
}
@@ -80,7 +83,7 @@ dependencies {
testCompile("org.apache.httpcomponents:httpclient:4.5.5") {
exclude group: "commons-logging", module: "commons-logging"
}
- testCompile('io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT')
+ testCompile('io.projectreactor.netty:reactor-netty')
testCompile('de.bechte.junit:junit-hierarchicalcontextrunner:4.12.1')
// Pull in the latest JUnit 5 Launcher API and the Vintage engine as well
// so that we can run JUnit 4 tests in IntelliJ IDEA.
diff --git a/spring-web/spring-web.gradle b/spring-web/spring-web.gradle
index f2162a2984..280f6cbe3f 100644
--- a/spring-web/spring-web.gradle
+++ b/spring-web/spring-web.gradle
@@ -8,6 +8,9 @@ dependencyManagement {
mavenBom "io.projectreactor:reactor-bom:${reactorVersion}"
mavenBom "io.netty:netty-bom:${nettyVersion}"
}
+ dependencies {
+ dependency "io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT"
+ }
resolutionStrategy {
cacheChangingModulesFor 0, 'seconds'
}
@@ -34,7 +37,7 @@ dependencies {
optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}")
optional("io.reactivex.rxjava2:rxjava:${rxjava2Version}")
optional("io.netty:netty-all")
- optional("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT")
+ optional("io.projectreactor.netty:reactor-netty")
optional("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}")
optional("org.eclipse.jetty:jetty-server:${jettyVersion}") {
exclude group: "javax.servlet", module: "javax.servlet-api"
diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpConnector.java
index 59260e81bf..4f437bcfea 100644
--- a/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpConnector.java
+++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpConnector.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2017 the original author or authors.
+ * Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,16 +35,15 @@ public interface ClientHttpConnector {
/**
* Connect to the origin server using the given {@code HttpMethod} and
- * {@code URI}, then apply the given {@code requestCallback} on the
- * {@link ClientHttpRequest} once the connection has been established.
- *
Return a publisher of the {@link ClientHttpResponse}.
+ * {@code URI} and apply the given {@code requestCallback} when the HTTP
+ * request of the underlying API can be initialized and written to.
* @param method the HTTP request method
* @param uri the HTTP request URI
- * @param requestCallback a function that prepares and writes the request,
- * returning a publisher that signals when it's done interacting with the
- * request. Implementations should return a {@code Mono} by calling
+ * @param requestCallback a function that prepares and writes to the request,
+ * returning a publisher that signals when it's done writing.
+ * Implementations can return a {@code Mono} by calling
* {@link ClientHttpRequest#writeWith} or {@link ClientHttpRequest#setComplete}.
- * @return a publisher of the {@link ClientHttpResponse}
+ * @return publisher for the {@link ClientHttpResponse}
*/
Mono connect(HttpMethod method, URI uri,
Function super ClientHttpRequest, Mono> requestCallback);
diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java
index ab21605ee9..042376e43c 100644
--- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java
+++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2017 the original author or authors.
+ * Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -44,16 +44,17 @@ public class ReactorClientHttpConnector implements ClientHttpConnector {
/**
* Create a Reactor Netty {@link ClientHttpConnector}
- * with a default configuration and HTTP compression support enabled.
+ * with default configuration and HTTP compression support enabled.
*/
public ReactorClientHttpConnector() {
- this.httpClient = HttpClient.create()
- .compress();
+ this.httpClient = HttpClient.create().compress();
}
/**
- * Create a Reactor Netty {@link ClientHttpConnector} with the given
- * {@link HttpClient}
+ * Create a Reactor Netty {@link ClientHttpConnector} with a fully
+ * configured {@code HttpClient}.
+ * @param httpClient the client instance to use
+ * @since 5.1
*/
public ReactorClientHttpConnector(HttpClient httpClient) {
this.httpClient = httpClient;
@@ -69,24 +70,23 @@ public class ReactorClientHttpConnector implements ClientHttpConnector {
}
return this.httpClient
- .request(adaptHttpMethod(method))
+ .request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name()))
.uri(uri.toString())
- .send((req, out) -> requestCallback.apply(adaptRequest(method, uri, req, out)))
+ .send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound)))
.responseConnection((res, con) -> Mono.just(adaptResponse(res, con.inbound(), con.outbound().alloc())))
.next();
}
- private io.netty.handler.codec.http.HttpMethod adaptHttpMethod(HttpMethod method) {
- return io.netty.handler.codec.http.HttpMethod.valueOf(method.name());
- }
+ private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request,
+ NettyOutbound nettyOutbound) {
- private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request, NettyOutbound out) {
- return new ReactorClientHttpRequest(method, uri, request, out);
+ return new ReactorClientHttpRequest(method, uri, request, nettyOutbound);
}
private ClientHttpResponse adaptResponse(HttpClientResponse response, NettyInbound nettyInbound,
- ByteBufAllocator alloc) {
- return new ReactorClientHttpResponse(response, nettyInbound, alloc);
+ ByteBufAllocator allocator) {
+
+ return new ReactorClientHttpResponse(response, nettyInbound, allocator);
}
}
diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java
index 349447a622..a74a1033d5 100644
--- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java
+++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2017 the original author or authors.
+ * Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -47,20 +47,19 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero
private final URI uri;
- private final HttpClientRequest httpRequest;
+ private final HttpClientRequest request;
- private final NettyOutbound out;
+ private final NettyOutbound outbound;
private final NettyDataBufferFactory bufferFactory;
- public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri,
- HttpClientRequest httpRequest, NettyOutbound out) {
- this.httpMethod = httpMethod;
+ public ReactorClientHttpRequest(HttpMethod method, URI uri, HttpClientRequest request, NettyOutbound outbound) {
+ this.httpMethod = method;
this.uri = uri;
- this.httpRequest = httpRequest;
- this.out = out;
- this.bufferFactory = new NettyDataBufferFactory(out.alloc());
+ this.request = request;
+ this.outbound = outbound;
+ this.bufferFactory = new NettyDataBufferFactory(outbound.alloc());
}
@@ -81,14 +80,16 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero
@Override
public Mono writeWith(Publisher extends DataBuffer> body) {
- return doCommit(() -> this.out
- .send(Flux.from(body).map(NettyDataBufferFactory::toByteBuf)).then());
+ return doCommit(() -> {
+ Flux byteBufFlux = Flux.from(body).map(NettyDataBufferFactory::toByteBuf);
+ return this.outbound.send(byteBufFlux).then();
+ });
}
@Override
public Mono writeAndFlushWith(Publisher extends Publisher extends DataBuffer>> body) {
Publisher> byteBufs = Flux.from(body).map(ReactorClientHttpRequest::toByteBufs);
- return doCommit(() -> this.out.sendGroups(byteBufs).then());
+ return doCommit(() -> this.outbound.sendGroups(byteBufs).then());
}
private static Publisher toByteBufs(Publisher extends DataBuffer> dataBuffers) {
@@ -97,24 +98,24 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero
@Override
public Mono writeWith(File file, long position, long count) {
- return doCommit(() -> this.out.sendFile(file.toPath(), position, count).then());
+ return doCommit(() -> this.outbound.sendFile(file.toPath(), position, count).then());
}
@Override
public Mono setComplete() {
- return doCommit(() -> out.then());
+ return doCommit(this.outbound::then);
}
@Override
protected void applyHeaders() {
- getHeaders().entrySet().forEach(e -> this.httpRequest.requestHeaders().set(e.getKey(), e.getValue()));
+ getHeaders().forEach((key, value) -> this.request.requestHeaders().set(key, value));
}
@Override
protected void applyCookies() {
getCookies().values().stream().flatMap(Collection::stream)
.map(cookie -> new DefaultCookie(cookie.getName(), cookie.getValue()))
- .forEach(this.httpRequest::addCookie);
+ .forEach(this.request::addCookie);
}
}
diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java
index 9e7b4df5b3..dc5a10a89f 100644
--- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java
+++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java
@@ -42,28 +42,26 @@ import io.netty.buffer.ByteBufAllocator;
*/
class ReactorClientHttpResponse implements ClientHttpResponse {
- private final NettyDataBufferFactory dataBufferFactory;
+ private final NettyDataBufferFactory bufferFactory;
private final HttpClientResponse response;
- private final NettyInbound nettyInbound;
+ private final NettyInbound inbound;
- public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound nettyInbound,
- ByteBufAllocator alloc) {
+ public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbound, ByteBufAllocator alloc) {
this.response = response;
- this.nettyInbound = nettyInbound;
- this.dataBufferFactory = new NettyDataBufferFactory(alloc);
+ this.inbound = inbound;
+ this.bufferFactory = new NettyDataBufferFactory(alloc);
}
@Override
public Flux getBody() {
- return nettyInbound
- .receive()
- .map(buf -> {
- buf.retain();
- return dataBufferFactory.wrap(buf);
+ return this.inbound.receive()
+ .map(byteBuf -> {
+ byteBuf.retain();
+ return this.bufferFactory.wrap(byteBuf);
});
}
diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java
index dba2f4c56a..a688ca7d5f 100644
--- a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java
+++ b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2017 the original author or authors.
+ * Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,14 +31,14 @@ public class ReactorHttpServer extends AbstractHttpServer {
private reactor.netty.http.server.HttpServer reactorServer;
- private AtomicReference disposableServer = new AtomicReference<>();
+ private AtomicReference serverRef = new AtomicReference<>();
@Override
- protected void initServer() throws Exception {
+ protected void initServer() {
this.reactorHandler = createHttpHandlerAdapter();
this.reactorServer = reactor.netty.http.server.HttpServer.create()
- .tcpConfiguration(tcpServer -> tcpServer.host(getHost()))
+ .tcpConfiguration(server -> server.host(getHost()))
.port(getPort());
}
@@ -48,21 +48,21 @@ public class ReactorHttpServer extends AbstractHttpServer {
@Override
protected void startInternal() {
- DisposableServer disposableServer = this.reactorServer.handle(this.reactorHandler).bind().block();
- setPort(disposableServer.address().getPort());
- this.disposableServer.set(disposableServer);
+ DisposableServer server = this.reactorServer.handle(this.reactorHandler).bind().block();
+ setPort(server.address().getPort());
+ this.serverRef.set(server);
}
@Override
protected void stopInternal() {
- this.disposableServer.get().dispose();
+ this.serverRef.get().dispose();
}
@Override
protected void resetInternal() {
this.reactorServer = null;
this.reactorHandler = null;
- this.disposableServer.set(null);
+ this.serverRef.set(null);
}
}
diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpsServer.java b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpsServer.java
index 8717440ccb..8195165bfa 100644
--- a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpsServer.java
+++ b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpsServer.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2017 the original author or authors.
+ * Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,15 +31,14 @@ public class ReactorHttpsServer extends AbstractHttpServer {
private reactor.netty.http.server.HttpServer reactorServer;
- private AtomicReference disposableServer = new AtomicReference<>();
+ private AtomicReference serverRef = new AtomicReference<>();
@Override
- protected void initServer() throws Exception {
+ protected void initServer() {
this.reactorHandler = createHttpHandlerAdapter();
this.reactorServer = reactor.netty.http.server.HttpServer.create()
- .tcpConfiguration(tcpServer -> tcpServer.host(getHost())
- .secure())
+ .tcpConfiguration(server -> server.host(getHost()).secure())
.port(getPort());
}
@@ -49,21 +48,21 @@ public class ReactorHttpsServer extends AbstractHttpServer {
@Override
protected void startInternal() {
- DisposableServer disposableServer = this.reactorServer.handle(this.reactorHandler).bind().block();
- setPort(disposableServer.address().getPort());
- this.disposableServer.set(disposableServer);
+ DisposableServer server = this.reactorServer.handle(this.reactorHandler).bind().block();
+ setPort(server.address().getPort());
+ this.serverRef.set(server);
}
@Override
protected void stopInternal() {
- this.disposableServer.get().dispose();
+ this.serverRef.get().dispose();
}
@Override
protected void resetInternal() {
this.reactorServer = null;
this.reactorHandler = null;
- this.disposableServer.set(null);
+ this.serverRef.set(null);
}
}
diff --git a/spring-webflux/spring-webflux.gradle b/spring-webflux/spring-webflux.gradle
index c3697b994a..9e9e99e126 100644
--- a/spring-webflux/spring-webflux.gradle
+++ b/spring-webflux/spring-webflux.gradle
@@ -7,6 +7,9 @@ dependencyManagement {
mavenBom "io.projectreactor:reactor-bom:${reactorVersion}"
mavenBom "io.netty:netty-bom:${nettyVersion}"
}
+ dependencies {
+ dependency "io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT"
+ }
resolutionStrategy {
cacheChangingModulesFor 0, 'seconds'
}
@@ -28,7 +31,7 @@ dependencies {
optional("com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${jackson2Version}")
optional("io.reactivex:rxjava:${rxjavaVersion}")
optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}")
- optional("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT")
+ optional("io.projectreactor.netty:reactor-netty")
optional("org.apache.tomcat:tomcat-websocket:${tomcatVersion}") {
exclude group: "org.apache.tomcat", module: "tomcat-websocket-api"
exclude group: "org.apache.tomcat", module: "tomcat-servlet-api"
diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java
index 0699e2cebe..55591f0c50 100644
--- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java
+++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2017 the original author or authors.
+ * Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,11 +17,11 @@ package org.springframework.web.reactive.socket.client;
import java.net.URI;
import java.util.List;
-import java.util.function.Consumer;
import io.netty.buffer.ByteBufAllocator;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
+import reactor.netty.http.websocket.WebsocketInbound;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
@@ -51,6 +51,7 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen
/**
* Constructor that accepts an existing {@link HttpClient} builder.
+ * @since 5.1
*/
public ReactorNettyWebSocketClient(HttpClient httpClient) {
this.httpClient = httpClient;
@@ -71,32 +72,32 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen
}
@Override
- public Mono execute(URI url, HttpHeaders headers, WebSocketHandler handler) {
- List protocols = beforeHandshake(url, headers, handler);
+ public Mono execute(URI url, HttpHeaders httpHeaders, WebSocketHandler handler) {
+ List protocols = beforeHandshake(url, httpHeaders, handler);
return getHttpClient()
- .headers(nettyHeaders -> setNettyHeaders(headers, nettyHeaders))
+ .headers(nettyHeaders -> setNettyHeaders(httpHeaders, nettyHeaders))
.websocket(StringUtils.collectionToCommaDelimitedString(protocols))
.uri(url.toString())
- .handle((in, out) -> {
- HandshakeInfo info = afterHandshake(url, toHttpHeaders(in.headers()));
- ByteBufAllocator allocator = out.alloc();
- NettyDataBufferFactory factory = new NettyDataBufferFactory(allocator);
- WebSocketSession session = new ReactorNettyWebSocketSession(in, out, info, factory);
+ .handle((inbound, outbound) -> {
+ HandshakeInfo info = afterHandshake(url, toHttpHeaders(inbound));
+ NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc());
+ WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory);
return handler.handle(session);
})
.next();
}
- private void setNettyHeaders(HttpHeaders headers, io.netty.handler.codec.http.HttpHeaders nettyHeaders) {
- headers.forEach(nettyHeaders::set);
+ private void setNettyHeaders(HttpHeaders httpHeaders, io.netty.handler.codec.http.HttpHeaders nettyHeaders) {
+ httpHeaders.forEach(nettyHeaders::set);
}
- private HttpHeaders toHttpHeaders(io.netty.handler.codec.http.HttpHeaders responseHeaders) {
+ private HttpHeaders toHttpHeaders(WebsocketInbound inbound) {
HttpHeaders headers = new HttpHeaders();
- responseHeaders.forEach(entry -> {
+ io.netty.handler.codec.http.HttpHeaders nettyHeaders = inbound.headers();
+ nettyHeaders.forEach(entry -> {
String name = entry.getKey();
- headers.put(name, responseHeaders.getAll(name));
+ headers.put(name, nettyHeaders.getAll(name));
});
return headers;
}
diff --git a/spring-websocket/spring-websocket.gradle b/spring-websocket/spring-websocket.gradle
index bba796f188..8a3a384ab1 100644
--- a/spring-websocket/spring-websocket.gradle
+++ b/spring-websocket/spring-websocket.gradle
@@ -7,6 +7,9 @@ dependencyManagement {
mavenBom "io.projectreactor:reactor-bom:${reactorVersion}"
mavenBom "io.netty:netty-bom:${nettyVersion}"
}
+ dependencies {
+ dependency "io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT"
+ }
resolutionStrategy {
cacheChangingModulesFor 0, 'seconds'
}
@@ -44,5 +47,5 @@ dependencies {
optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}")
testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}")
testCompile("org.apache.tomcat.embed:tomcat-embed-websocket:${tomcatVersion}")
- testCompile("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT")
+ testCompile("io.projectreactor.netty:reactor-netty")
}