Add support for Jetty Reactive Streams HTTP client

Leverage https://github.com/jetty-project/jetty-reactive-httpclient
to add support for Jetty in WebClient via JettyClientHttpConnector.

Implemented with buffer copy instead of optimized buffer wrapping
because the latter hangs since Callback#succeeded doesn't allow
releasing the buffer and requesting more data at different times
(required for Mono<DataBuffer> for example).
See https://github.com/eclipse/jetty.project/issues/2429.

Issue: SPR-15092
This commit is contained in:
sdeleuze
2018-04-11 10:42:36 +02:00
committed by Sebastien Deleuze
parent 3c9049d530
commit a87764f1fd
8 changed files with 465 additions and 3 deletions

View File

@@ -0,0 +1,143 @@
/*
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.net.URI;
import java.util.function.Function;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.Callback;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpMethod;
/**
* Jetty ReactiveStreams HttpClient implementation of {@link ClientHttpConnector}.
*
* Implemented with buffer copy instead of optimized buffer wrapping because the latter
* hangs since {@link Callback#succeeded()} doesn't allow releasing the buffer and
* requesting more data at different times (required for {@code Mono<DataBuffer>} for example).
* See https://github.com/eclipse/jetty.project/issues/2429 for more details.
*
* @author Sebastien Deleuze
* @since 5.1
* @see <a href="https://github.com/jetty-project/jetty-reactive-httpclient">Jetty ReactiveStreams HttpClient</a>
*/
public class JettyClientHttpConnector implements ClientHttpConnector, SmartLifecycle {
private final HttpClient httpClient;
private DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
/**
* Create a Jetty {@link ClientHttpConnector} with the default {@link HttpClient}.
*/
public JettyClientHttpConnector() {
this(new HttpClient());
}
/**
* Create a Jetty {@link ClientHttpConnector} with the given {@link HttpClient}.
*/
public JettyClientHttpConnector(HttpClient httpClient) {
this.httpClient = httpClient;
}
public void setBufferFactory(DataBufferFactory bufferFactory) {
this.bufferFactory = bufferFactory;
}
@Override
public int getPhase() {
return Integer.MAX_VALUE;
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public boolean isRunning() {
return this.httpClient.isRunning();
}
@Override
public void start() {
try {
// HttpClient is internally synchronized and protected with state checks
this.httpClient.start();
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@Override
public void stop() {
try {
this.httpClient.stop();
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@Override
public void stop(Runnable callback) {
stop();
callback.run();
}
@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
if (!uri.isAbsolute()) {
return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri));
}
if (!this.httpClient.isStarted()) {
try {
this.httpClient.start();
}
catch (Exception ex) {
return Mono.error(ex);
}
}
JettyClientHttpRequest clientHttpRequest = new JettyClientHttpRequest(
this.httpClient.newRequest(uri).method(method.toString()), this.bufferFactory);
return requestCallback.apply(clientHttpRequest).then(Mono.from(
clientHttpRequest.getReactiveRequest().response((reactiveResponse, contentChunks) -> {
Flux<DataBuffer> content = Flux.from(contentChunks).map(chunk -> {
DataBuffer buffer = this.bufferFactory.allocateBuffer(chunk.buffer.capacity());
buffer.write(chunk.buffer);
chunk.callback.succeeded();
return buffer;
});
return Mono.just(new JettyClientHttpResponse(reactiveResponse, content));
})));
}
}

View File

@@ -0,0 +1,148 @@
/*
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.net.HttpCookie;
import java.net.URI;
import java.util.Collection;
import java.util.function.Function;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.reactive.client.ContentChunk;
import org.eclipse.jetty.reactive.client.ReactiveRequest;
import org.eclipse.jetty.util.Callback;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* {@link ClientHttpRequest} implementation for the Jetty ReactiveStreams HTTP client.
*
* @author Sebastien Deleuze
* @since 5.1
* @see <a href="https://github.com/jetty-project/jetty-reactive-httpclient">Jetty ReactiveStreams HttpClient</a>
*/
class JettyClientHttpRequest extends AbstractClientHttpRequest {
private final Request jettyRequest;
private final DataBufferFactory bufferFactory;
@Nullable
private ReactiveRequest reactiveRequest;
public JettyClientHttpRequest(Request jettyRequest, DataBufferFactory bufferFactory) {
this.jettyRequest = jettyRequest;
this.bufferFactory = bufferFactory;
}
@Override
public HttpMethod getMethod() {
HttpMethod method = HttpMethod.resolve(this.jettyRequest.getMethod());
Assert.state(method != null, "Method must not be null");
return method;
}
@Override
public URI getURI() {
return this.jettyRequest.getURI();
}
@Override
public Mono<Void> setComplete() {
return doCommit(this::completes);
}
@Override
public DataBufferFactory bufferFactory() {
return this.bufferFactory;
}
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> publisher) {
Flux<ContentChunk> chunks = Flux.from(publisher).map(this::toContentChunk);
MediaType contentType = getHeaders().getContentType();
ReactiveRequest.Content requestContent = ReactiveRequest.Content.fromPublisher(chunks,
(contentType != null ? contentType.toString() : MediaType.APPLICATION_OCTET_STREAM_VALUE));
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(requestContent).build();
return doCommit(this::completes);
}
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
String contentType = this.jettyRequest.getHeaders().getField(HttpHeader.CONTENT_TYPE).getValue();
Flux<ContentChunk> chunks = Flux.from(body).flatMap(Function.identity()).map(this::toContentChunk);
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, contentType);
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
return doCommit(this::completes);
}
private Mono<Void> completes() {
return Mono.empty();
}
private ContentChunk toContentChunk(DataBuffer buffer) {
return new ContentChunk(buffer.asByteBuffer(), new Callback() {
@Override
public void succeeded() {
DataBufferUtils.release(buffer);
}
@Override
public void failed(Throwable x) {
DataBufferUtils.release(buffer);
throw Exceptions.propagate(x);
}
});
}
@Override
protected void applyCookies() {
getCookies().values().stream().flatMap(Collection::stream)
.map(cookie -> new HttpCookie(cookie.getName(), cookie.getValue()))
.forEach(this.jettyRequest::cookie);
}
@Override
protected void applyHeaders() {
HttpHeaders headers = getHeaders();
headers.forEach((key, value) -> value.forEach(v -> this.jettyRequest.header(key, v)));
if (!headers.containsKey(HttpHeaders.ACCEPT)) {
this.jettyRequest.header(HttpHeaders.ACCEPT, "*/*");
}
}
ReactiveRequest getReactiveRequest() {
if (this.reactiveRequest == null) {
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).build();
}
return this.reactiveRequest;
}
}

View File

@@ -0,0 +1,94 @@
/*
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.net.HttpCookie;
import org.eclipse.jetty.reactive.client.ReactiveResponse;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* {@link ClientHttpResponse} implementation for the Jetty ReactiveStreams HTTP client.
*
* @author Sebastien Deleuze
* @since 5.1
* @see <a href="https://github.com/jetty-project/jetty-reactive-httpclient">Jetty ReactiveStreams HttpClient</a>
*/
class JettyClientHttpResponse implements ClientHttpResponse {
private final ReactiveResponse reactiveResponse;
private final Flux<DataBuffer> content;
public JettyClientHttpResponse(ReactiveResponse reactiveResponse, Publisher<DataBuffer> content) {
Assert.notNull(reactiveResponse, "reactiveResponse should not be null");
Assert.notNull(content, "content should not be null");
this.reactiveResponse = reactiveResponse;
this.content = Flux.from(content);
}
@Override
public HttpStatus getStatusCode() {
return HttpStatus.valueOf(getRawStatusCode());
}
@Override
public int getRawStatusCode() {
return this.reactiveResponse.getStatus();
}
@Override
public MultiValueMap<String, ResponseCookie> getCookies() {
MultiValueMap<String, ResponseCookie> result = new LinkedMultiValueMap<>();
getHeaders().get(HttpHeaders.SET_COOKIE).forEach(header -> {
HttpCookie.parse(header).forEach(cookie -> result.add(cookie.getName(), ResponseCookie.from(cookie.getName(), cookie.getValue())
.domain(cookie.getDomain())
.path(cookie.getPath())
.maxAge(cookie.getMaxAge())
.secure(cookie.getSecure())
.httpOnly(cookie.isHttpOnly())
.build()));
});
return CollectionUtils.unmodifiableMultiValueMap(result);
}
@Override
public Flux<DataBuffer> getBody() {
return this.content;
}
@Override
public HttpHeaders getHeaders() {
HttpHeaders headers = new HttpHeaders();
this.reactiveResponse.getHeaders().stream()
.forEach(e -> headers.add(e.getName(), e.getValue()));
return headers;
}
}