Move RxNetty support to test scope
The RxNetty runtime support is not meant to be officially supported and should be restricted to testing purposes only. Issue: SPR-15444
This commit is contained in:
@@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.http.server.reactive;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
|
||||
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
|
||||
import io.reactivex.netty.protocol.http.server.RequestHandler;
|
||||
import reactor.core.publisher.Mono;
|
||||
import rx.Observable;
|
||||
import rx.RxReactiveStreams;
|
||||
|
||||
/**
|
||||
* Adapt {@link HttpHandler} to the RxNetty {@link RequestHandler}.
|
||||
* For internal use within the framework.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 5.0
|
||||
*/
|
||||
public class RxNettyHttpHandlerAdapter implements RequestHandler<ByteBuf, ByteBuf> {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(RxNettyHttpHandlerAdapter.class);
|
||||
|
||||
|
||||
private final HttpHandler httpHandler;
|
||||
|
||||
|
||||
public RxNettyHttpHandlerAdapter(HttpHandler httpHandler) {
|
||||
Assert.notNull(httpHandler, "HttpHandler must not be null");
|
||||
this.httpHandler = httpHandler;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Observable<Void> handle(HttpServerRequest<ByteBuf> nativeRequest,
|
||||
HttpServerResponse<ByteBuf> nativeResponse) {
|
||||
|
||||
Channel channel = nativeResponse.unsafeNettyChannel();
|
||||
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(channel.alloc());
|
||||
InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress();
|
||||
|
||||
RxNettyServerHttpRequest request = new RxNettyServerHttpRequest(nativeRequest, bufferFactory, remoteAddress);
|
||||
RxNettyServerHttpResponse response = new RxNettyServerHttpResponse(nativeResponse, bufferFactory);
|
||||
|
||||
Publisher<Void> result = this.httpHandler.handle(request, response)
|
||||
.onErrorResume(ex -> {
|
||||
logger.error("Could not complete request", ex);
|
||||
nativeResponse.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
|
||||
return Mono.empty();
|
||||
})
|
||||
.doOnSuccess(aVoid -> logger.debug("Successfully completed request"));
|
||||
|
||||
return RxReactiveStreams.toObservable(result);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,125 @@
|
||||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.http.server.reactive;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Optional;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.handler.codec.http.cookie.Cookie;
|
||||
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
|
||||
import reactor.core.publisher.Flux;
|
||||
import rx.Observable;
|
||||
import rx.RxReactiveStreams;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||
import org.springframework.http.HttpCookie;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.LinkedMultiValueMap;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
|
||||
/**
|
||||
* Adapt {@link ServerHttpRequest} to the RxNetty {@link HttpServerRequest}.
|
||||
* For internal use within the framework.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Stephane Maldini
|
||||
* @since 5.0
|
||||
*/
|
||||
public class RxNettyServerHttpRequest extends AbstractServerHttpRequest {
|
||||
|
||||
private final HttpServerRequest<ByteBuf> request;
|
||||
|
||||
private final NettyDataBufferFactory dataBufferFactory;
|
||||
|
||||
private final InetSocketAddress remoteAddress;
|
||||
|
||||
|
||||
public RxNettyServerHttpRequest(HttpServerRequest<ByteBuf> request,
|
||||
NettyDataBufferFactory dataBufferFactory, InetSocketAddress remoteAddress) {
|
||||
|
||||
super(initUri(request, remoteAddress), initHeaders(request));
|
||||
|
||||
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
|
||||
this.request = request;
|
||||
this.dataBufferFactory = dataBufferFactory;
|
||||
this.remoteAddress = remoteAddress;
|
||||
}
|
||||
|
||||
private static URI initUri(HttpServerRequest<ByteBuf> request, InetSocketAddress remoteAddress) {
|
||||
Assert.notNull(request, "'request' must not be null");
|
||||
String requestUri = request.getUri();
|
||||
return remoteAddress != null ? getBaseUrl(remoteAddress).resolve(requestUri) : URI.create(requestUri);
|
||||
}
|
||||
|
||||
private static URI getBaseUrl(InetSocketAddress address) {
|
||||
try {
|
||||
return new URI(null, null, address.getHostString(), address.getPort(), null, null, null);
|
||||
}
|
||||
catch (URISyntaxException ex) {
|
||||
// Should not happen...
|
||||
throw new IllegalStateException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
private static HttpHeaders initHeaders(HttpServerRequest<ByteBuf> request) {
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
for (String name : request.getHeaderNames()) {
|
||||
headers.put(name, request.getAllHeaderValues(name));
|
||||
}
|
||||
return headers;
|
||||
}
|
||||
|
||||
|
||||
public HttpServerRequest<ByteBuf> getRxNettyRequest() {
|
||||
return this.request;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpMethod getMethod() {
|
||||
return HttpMethod.valueOf(this.request.getHttpMethod().name());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MultiValueMap<String, HttpCookie> initCookies() {
|
||||
MultiValueMap<String, HttpCookie> cookies = new LinkedMultiValueMap<>();
|
||||
for (String name : this.request.getCookies().keySet()) {
|
||||
for (Cookie cookie : this.request.getCookies().get(name)) {
|
||||
HttpCookie httpCookie = new HttpCookie(name, cookie.value());
|
||||
cookies.add(name, httpCookie);
|
||||
}
|
||||
}
|
||||
return cookies;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<InetSocketAddress> getRemoteAddress() {
|
||||
return Optional.ofNullable(this.remoteAddress);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<DataBuffer> getBody() {
|
||||
Observable<DataBuffer> content = this.request.getContent().map(dataBufferFactory::wrap);
|
||||
return Flux.from(RxReactiveStreams.toPublisher(content));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,177 @@
|
||||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.http.server.reactive;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||
import io.netty.handler.codec.http.cookie.Cookie;
|
||||
import io.netty.handler.codec.http.cookie.DefaultCookie;
|
||||
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
|
||||
import io.reactivex.netty.protocol.http.server.ResponseContentWriter;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import rx.Observable;
|
||||
import rx.RxReactiveStreams;
|
||||
import rx.functions.Func1;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseCookie;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* Adapt {@link ServerHttpResponse} to the RxNetty {@link HttpServerResponse}.
|
||||
* For internal use within the framework.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Stephane Maldini
|
||||
* @author Sebastien Deleuze
|
||||
* @since 5.0
|
||||
*/
|
||||
public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
|
||||
|
||||
private final HttpServerResponse<ByteBuf> response;
|
||||
|
||||
private static final ByteBuf FLUSH_SIGNAL = Unpooled.buffer(0, 0);
|
||||
|
||||
// 8 Kb flush threshold to avoid blocking RxNetty when the send buffer has reached the high watermark
|
||||
private static final long FLUSH_THRESHOLD = 8192;
|
||||
|
||||
public RxNettyServerHttpResponse(HttpServerResponse<ByteBuf> response,
|
||||
NettyDataBufferFactory dataBufferFactory) {
|
||||
super(dataBufferFactory);
|
||||
Assert.notNull(response, "'response' must not be null.");
|
||||
|
||||
this.response = response;
|
||||
}
|
||||
|
||||
|
||||
public HttpServerResponse<?> getRxNettyResponse() {
|
||||
return this.response;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void applyStatusCode() {
|
||||
HttpStatus statusCode = this.getStatusCode();
|
||||
if (statusCode != null) {
|
||||
this.response.setStatus(HttpResponseStatus.valueOf(statusCode.value()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> body) {
|
||||
Observable<ByteBuf> content = RxReactiveStreams.toObservable(body)
|
||||
.map(NettyDataBufferFactory::toByteBuf);
|
||||
return Flux.from(RxReactiveStreams.toPublisher(this.response.write(content, new FlushSelector(FLUSH_THRESHOLD))))
|
||||
.then();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> writeAndFlushWithInternal(
|
||||
Publisher<? extends Publisher<? extends DataBuffer>> body) {
|
||||
Flux<ByteBuf> bodyWithFlushSignals = Flux.from(body).
|
||||
flatMap(publisher -> Flux.from(publisher).
|
||||
map(NettyDataBufferFactory::toByteBuf).
|
||||
concatWith(Mono.just(FLUSH_SIGNAL)));
|
||||
Observable<ByteBuf> content = RxReactiveStreams.toObservable(bodyWithFlushSignals);
|
||||
ResponseContentWriter<ByteBuf> writer = this.response.write(content, bb -> bb == FLUSH_SIGNAL);
|
||||
return Flux.from(RxReactiveStreams.toPublisher(writer)).then();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void applyHeaders() {
|
||||
for (String name : getHeaders().keySet()) {
|
||||
for (String value : getHeaders().get(name)) {
|
||||
this.response.addHeader(name, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void applyCookies() {
|
||||
for (String name : getCookies().keySet()) {
|
||||
for (ResponseCookie httpCookie : getCookies().get(name)) {
|
||||
Cookie cookie = new DefaultCookie(name, httpCookie.getValue());
|
||||
if (!httpCookie.getMaxAge().isNegative()) {
|
||||
cookie.setMaxAge(httpCookie.getMaxAge().getSeconds());
|
||||
}
|
||||
httpCookie.getDomain().ifPresent(cookie::setDomain);
|
||||
httpCookie.getPath().ifPresent(cookie::setPath);
|
||||
cookie.setSecure(httpCookie.isSecure());
|
||||
cookie.setHttpOnly(httpCookie.isHttpOnly());
|
||||
this.response.addCookie(cookie);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class FlushSelector implements Func1<ByteBuf, Boolean> {
|
||||
|
||||
private final long flushEvery;
|
||||
private long count;
|
||||
|
||||
public FlushSelector(long flushEvery) {
|
||||
this.flushEvery = flushEvery;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean call(ByteBuf byteBuf) {
|
||||
this.count += byteBuf.readableBytes();
|
||||
if (this.count >= this.flushEvery) {
|
||||
this.count = 0;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
While the underlying implementation of {@link ZeroCopyHttpOutputMessage} seems to
|
||||
work; it does bypass {@link #applyBeforeCommit} and more importantly it doesn't change
|
||||
its {@linkplain #state()). Therefore it's commented out, for now.
|
||||
|
||||
We should revisit this code once
|
||||
https://github.com/ReactiveX/RxNetty/issues/194 has been fixed.
|
||||
|
||||
|
||||
@Override
|
||||
public Mono<Void> writeWith(File file, long position, long count) {
|
||||
Channel channel = this.response.unsafeNettyChannel();
|
||||
|
||||
HttpResponse httpResponse =
|
||||
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
|
||||
io.netty.handler.codec.http.HttpHeaders headers = httpResponse.headers();
|
||||
|
||||
for (Map.Entry<String, List<String>> header : getHeaders().entrySet()) {
|
||||
String headerName = header.getKey();
|
||||
for (String headerValue : header.getValue()) {
|
||||
headers.add(headerName, headerValue);
|
||||
}
|
||||
}
|
||||
Mono<Void> responseWrite = MonoChannelFuture.from(channel.write(httpResponse));
|
||||
|
||||
FileRegion fileRegion = new DefaultFileRegion(file, position, count);
|
||||
Mono<Void> fileWrite = MonoChannelFuture.from(channel.writeAndFlush(fileRegion));
|
||||
|
||||
return Flux.concat(applyBeforeCommit(), responseWrite, fileWrite).then();
|
||||
}
|
||||
*/
|
||||
}
|
||||
Reference in New Issue
Block a user