Commit a0946c89 authored by Andy Wilkinson's avatar Andy Wilkinson

Start building against Reactor 2020.0.0 snapshots

See gh-21927
parent 632d9bab
......@@ -79,8 +79,8 @@ public class NettyWebServerFactoryCustomizer
}
private void customizeConnectionTimeout(NettyReactiveWebServerFactory factory, Duration connectionTimeout) {
factory.addServerCustomizers((httpServer) -> httpServer.tcpConfiguration((tcpServer) -> tcpServer
.selectorOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) connectionTimeout.toMillis())));
factory.addServerCustomizers((httpServer) -> httpServer.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
(int) connectionTimeout.toMillis()));
}
}
......@@ -19,7 +19,6 @@ package org.springframework.boot.autoconfigure.web.embedded;
import java.time.Duration;
import java.util.Map;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
......@@ -27,7 +26,6 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.MockitoAnnotations;
import reactor.netty.http.server.HttpServer;
import reactor.netty.tcp.TcpServer;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.boot.autoconfigure.web.ServerProperties.ForwardHeadersStrategy;
......@@ -35,7 +33,6 @@ import org.springframework.boot.context.properties.source.ConfigurationPropertyS
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
import org.springframework.boot.web.embedded.netty.NettyServerCustomizer;
import org.springframework.mock.env.MockEnvironment;
import org.springframework.test.util.ReflectionTestUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
......@@ -110,7 +107,6 @@ class NettyWebServerFactoryCustomizerTests {
verifyConnectionTimeout(factory, 1000);
}
@SuppressWarnings("unchecked")
private void verifyConnectionTimeout(NettyReactiveWebServerFactory factory, Integer expected) {
if (expected == null) {
verify(factory, never()).addServerCustomizers(any(NettyServerCustomizer.class));
......@@ -119,10 +115,8 @@ class NettyWebServerFactoryCustomizerTests {
verify(factory, times(1)).addServerCustomizers(this.customizerCaptor.capture());
NettyServerCustomizer serverCustomizer = this.customizerCaptor.getValue();
HttpServer httpServer = serverCustomizer.apply(HttpServer.create());
TcpServer tcpConfiguration = ReflectionTestUtils.invokeMethod(httpServer, "tcpConfiguration");
ServerBootstrap bootstrap = tcpConfiguration.configure();
Map<Object, Object> options = (Map<Object, Object>) ReflectionTestUtils.getField(bootstrap, "options");
assertThat(options).containsEntry(ChannelOption.CONNECT_TIMEOUT_MILLIS, expected);
Map<ChannelOption<?>, ?> options = httpServer.configuration().options();
assertThat(options.get(ChannelOption.CONNECT_TIMEOUT_MILLIS)).isEqualTo(expected);
}
private void setupConnectionTimeout(Duration connectionTimeout) {
......
......@@ -1439,7 +1439,7 @@ bom {
]
}
}
library("Reactor Bom", "Dysprosium-SR7") {
library("Reactor Bom", "2020.0.0-SNAPSHOT") {
group("io.projectreactor") {
imports = [
"reactor-bom"
......
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -19,7 +19,6 @@ package org.springframework.boot.docs.web.reactive.function.client;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.TcpClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......@@ -39,10 +38,10 @@ public class ReactorNettyClientCustomizationExample {
// tag::custom-http-connector[]
@Bean
ClientHttpConnector clientHttpConnector(ReactorResourceFactory resourceFactory) {
TcpClient tcpClient = TcpClient.create(resourceFactory.getConnectionProvider())
HttpClient httpClient = HttpClient.create(resourceFactory.getConnectionProvider())
.runOn(resourceFactory.getLoopResources()).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000)
.doOnConnected((connection) -> connection.addHandlerLast(new ReadTimeoutHandler(60)));
return new ReactorClientHttpConnector(HttpClient.from(tcpClient));
return new ReactorClientHttpConnector(httpClient);
}
// end::custom-http-connector[]
......
......@@ -172,8 +172,8 @@ public class NettyRSocketServerFactory implements RSocketServerFactory, Configur
private ServerTransport<CloseableChannel> createWebSocketTransport() {
if (this.resourceFactory != null) {
HttpServer httpServer = HttpServer.create().tcpConfiguration((tcpServer) -> tcpServer
.runOn(this.resourceFactory.getLoopResources()).bindAddress(this::getListenAddress));
HttpServer httpServer = HttpServer.create().runOn(this.resourceFactory.getLoopResources())
.bindAddress(this::getListenAddress);
return WebsocketServerTransport.create(httpServer);
}
return WebsocketServerTransport.create(getListenAddress());
......
......@@ -154,11 +154,10 @@ public class NettyReactiveWebServerFactory extends AbstractReactiveWebServerFact
if (this.resourceFactory != null) {
LoopResources resources = this.resourceFactory.getLoopResources();
Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?");
server = server
.tcpConfiguration((tcpServer) -> tcpServer.runOn(resources).bindAddress(this::getListenAddress));
server = server.runOn(resources).bindAddress(this::getListenAddress);
}
else {
server = server.tcpConfiguration((tcpServer) -> tcpServer.bindAddress(this::getListenAddress));
server = server.bindAddress(this::getListenAddress);
}
if (getSsl() != null && getSsl().isEnabled()) {
SslServerCustomizer sslServerCustomizer = new SslServerCustomizer(getSsl(), getHttp2(),
......
......@@ -456,9 +456,8 @@ public abstract class AbstractReactiveWebServerFactoryTests {
this.webServer.start();
HttpClient client = HttpClient.create().wiretap(true).compress(true)
.tcpConfiguration((tcpClient) -> tcpClient.doOnConnected(
(connection) -> connection.channel().pipeline().addBefore(NettyPipeline.HttpDecompressor,
"CompressionTest", new CompressionDetectionHandler())));
.doOnConnected((connection) -> connection.channel().pipeline().addBefore(NettyPipeline.HttpDecompressor,
"CompressionTest", new CompressionDetectionHandler()));
return getWebClient(client, this.webServer.getPort()).build();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment