diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java index 76106fd7e2..3cfdc07321 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java @@ -22,11 +22,14 @@ import java.util.function.Function; import org.springframework.http.HttpMethod; import reactor.core.publisher.Mono; +import reactor.io.netty.http.HttpException; +import reactor.io.netty.http.HttpInbound; /** * Reactor-Netty implementation of {@link ClientHttpConnector} * * @author Brian Clozel + * @see reactor.io.netty.http.HttpClient * @since 5.0 */ public class ReactorClientHttpConnector implements ClientHttpConnector { @@ -38,7 +41,10 @@ public class ReactorClientHttpConnector implements ClientHttpConnector { return reactor.io.netty.http.HttpClient.create(uri.getHost(), uri.getPort()) .request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name()), uri.toString(), - httpOutbound -> requestCallback.apply(new ReactorClientHttpRequest(method, uri, httpOutbound))) + httpClientRequest -> requestCallback + .apply(new ReactorClientHttpRequest(method, uri, httpClientRequest))) + .cast(HttpInbound.class) + .otherwise(HttpException.class, exc -> Mono.just(exc.getChannel())) .map(httpInbound -> new ReactorClientHttpResponse(httpInbound)); } } \ No newline at end of file diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java index 1cec205463..f90cad9def 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java @@ -19,7 +19,7 @@ package org.springframework.http.client.reactive; import java.util.Collection; import reactor.core.publisher.Flux; -import reactor.io.netty.http.HttpClientResponse; +import reactor.io.netty.http.HttpInbound; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.NettyDataBufferFactory; @@ -34,16 +34,16 @@ import org.springframework.util.MultiValueMap; * {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client * * @author Brian Clozel - * @since 5.0 * @see reactor.io.netty.http.HttpClient + * @since 5.0 */ public class ReactorClientHttpResponse implements ClientHttpResponse { private final NettyDataBufferFactory dataBufferFactory; - private final HttpClientResponse response; + private final HttpInbound response; - public ReactorClientHttpResponse(HttpClientResponse response) { + public ReactorClientHttpResponse(HttpInbound response) { this.response = response; this.dataBufferFactory = new NettyDataBufferFactory(response.delegate().alloc()); } diff --git a/spring-web/src/main/java/org/springframework/web/client/reactive/BodyExtractor.java b/spring-web/src/main/java/org/springframework/web/client/reactive/BodyExtractor.java new file mode 100644 index 0000000000..9bc2837c1c --- /dev/null +++ b/spring-web/src/main/java/org/springframework/web/client/reactive/BodyExtractor.java @@ -0,0 +1,43 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.client.reactive; + +import java.util.List; + +import org.springframework.http.client.reactive.ClientHttpResponse; +import org.springframework.http.converter.reactive.HttpMessageConverter; + +/** + * A {@code BodyExtractor} extracts the content of a raw {@link ClientHttpResponse}, + * decoding the response body and using a target composition API. + * + *

See static factory methods in {@link ResponseExtractors} + * and {@link org.springframework.web.client.reactive.support.RxJava1ResponseExtractors}. + * + * @author Brian Clozel + * @since 5.0 + */ +public interface BodyExtractor { + + /** + * Extract content from the response body + * @param clientResponse the raw HTTP response + * @param messageConverters the message converters that decode the response body + * @return the relevant content + */ + T extract(ClientHttpResponse clientResponse, List> messageConverters); +} diff --git a/spring-web/src/main/java/org/springframework/web/client/reactive/DefaultResponseErrorHandler.java b/spring-web/src/main/java/org/springframework/web/client/reactive/DefaultResponseErrorHandler.java new file mode 100644 index 0000000000..5769cbd658 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/web/client/reactive/DefaultResponseErrorHandler.java @@ -0,0 +1,45 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.client.reactive; + +import java.util.List; + +import org.springframework.http.HttpStatus; +import org.springframework.http.client.reactive.ClientHttpResponse; +import org.springframework.http.converter.reactive.HttpMessageConverter; + +/** + * Default implementation of the {@link ResponseErrorHandler} interface + * that throws {@link WebClientErrorException}s for HTTP 4xx responses + * and {@link WebServerErrorException}s for HTTP 5xx responses. + * + * @author Brian Clozel + * @since 5.0 + */ +public class DefaultResponseErrorHandler implements ResponseErrorHandler { + + @Override + public void handleError(ClientHttpResponse response, List> messageConverters) { + HttpStatus responseStatus = response.getStatusCode(); + if (responseStatus.is4xxClientError()) { + throw new WebClientErrorException(response, messageConverters); + } + if (responseStatus.is5xxServerError()) { + throw new WebServerErrorException(response, messageConverters); + } + } +} diff --git a/spring-web/src/main/java/org/springframework/web/client/reactive/ResponseErrorHandler.java b/spring-web/src/main/java/org/springframework/web/client/reactive/ResponseErrorHandler.java new file mode 100644 index 0000000000..3f7ac52c9d --- /dev/null +++ b/spring-web/src/main/java/org/springframework/web/client/reactive/ResponseErrorHandler.java @@ -0,0 +1,41 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.client.reactive; + +import java.util.List; + +import org.springframework.http.client.reactive.ClientHttpResponse; +import org.springframework.http.converter.reactive.HttpMessageConverter; + +/** + * Strategy interface used by the {@link WebClient} to handle + * errors in {@link ClientHttpResponse}s if needed. + * + * @author Brian Clozel + * @see DefaultResponseErrorHandler + * @since 5.0 + */ +public interface ResponseErrorHandler { + + /** + * Handle the error in the given response. + * Implementations will typically inspect the {@link ClientHttpResponse#getStatusCode() HttpStatus} + * of the response and throw {@link WebClientException}s in case of errors. + */ + void handleError(ClientHttpResponse response, List> messageConverters); + +} diff --git a/spring-web/src/main/java/org/springframework/web/client/reactive/ResponseExtractor.java b/spring-web/src/main/java/org/springframework/web/client/reactive/ResponseExtractor.java index 501391f3b3..9c46e3b1d0 100644 --- a/spring-web/src/main/java/org/springframework/web/client/reactive/ResponseExtractor.java +++ b/spring-web/src/main/java/org/springframework/web/client/reactive/ResponseExtractor.java @@ -16,24 +16,28 @@ package org.springframework.web.client.reactive; -import java.util.List; - import reactor.core.publisher.Mono; import org.springframework.http.client.reactive.ClientHttpResponse; -import org.springframework.http.converter.reactive.HttpMessageConverter; /** * A {@code ResponseExtractor} extracts the relevant part of a * raw {@link org.springframework.http.client.reactive.ClientHttpResponse}, * optionally decoding the response body and using a target composition API. * - *

See static factory methods in {@link ResponseExtractors}. + *

See static factory methods in {@link ResponseExtractors} and + * {@link org.springframework.web.client.reactive.support.RxJava1ResponseExtractors}. * * @author Brian Clozel * @since 5.0 */ public interface ResponseExtractor { - T extract(Mono clientResponse, List> messageConverters); + /** + * Extract content from the response + * @param clientResponse the raw HTTP response + * @param webClientConfig the {@link WebClient} configuration information + * @return the relevant part of the response + */ + T extract(Mono clientResponse, WebClientConfig webClientConfig); } diff --git a/spring-web/src/main/java/org/springframework/web/client/reactive/ResponseExtractors.java b/spring-web/src/main/java/org/springframework/web/client/reactive/ResponseExtractors.java index f3fedce1e5..39e34619ed 100644 --- a/spring-web/src/main/java/org/springframework/web/client/reactive/ResponseExtractors.java +++ b/spring-web/src/main/java/org/springframework/web/client/reactive/ResponseExtractors.java @@ -17,7 +17,9 @@ package org.springframework.web.client.reactive; import java.util.List; -import java.util.Optional; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; import org.springframework.http.HttpHeaders; @@ -26,12 +28,9 @@ import org.springframework.http.ResponseEntity; import org.springframework.http.client.reactive.ClientHttpResponse; import org.springframework.http.converter.reactive.HttpMessageConverter; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - /** - * Static factory methods for {@link ResponseExtractor} based on the {@link Flux} and - * {@link Mono} APIs. + * Static factory methods for {@link ResponseExtractor} and {@link BodyExtractor}, + * based on the {@link Flux} and {@link Mono} APIs. * * @author Brian Clozel * @since 5.0 @@ -46,9 +45,11 @@ public class ResponseExtractors { */ @SuppressWarnings("unchecked") public static ResponseExtractor> body(ResolvableType bodyType) { - return (clientResponse, messageConverters) -> (Mono) clientResponse - .flatMap(resp -> decodeResponseBody(resp, bodyType, - messageConverters)) + return (clientResponse, webClientConfig) -> (Mono) clientResponse + .doOnNext(response -> webClientConfig.getResponseErrorHandler() + .handleError(response, webClientConfig.getMessageConverters())) + .flatMap(resp -> decodeResponseBodyAsMono(resp, bodyType, + webClientConfig.getMessageConverters())) .next(); } @@ -60,13 +61,33 @@ public class ResponseExtractors { return body(bodyType); } + /** + * Extract the response body and decode it, returning it as a {@code Mono}. + * @see ResolvableType#forClassWithGenerics(Class, Class[]) + */ + @SuppressWarnings("unchecked") + public static BodyExtractor> as(ResolvableType bodyType) { + return (clientResponse, messageConverters) -> + decodeResponseBodyAsMono(clientResponse, bodyType, messageConverters); + } + + /** + * Extract the response body and decode it, returning it as a {@code Mono} + */ + public static BodyExtractor> as(Class sourceClass) { + ResolvableType bodyType = ResolvableType.forClass(sourceClass); + return as(bodyType); + } + /** * Extract the response body and decode it, returning it as a {@code Flux}. * @see ResolvableType#forClassWithGenerics(Class, Class[]) */ public static ResponseExtractor> bodyStream(ResolvableType bodyType) { - return (clientResponse, messageConverters) -> clientResponse - .flatMap(resp -> decodeResponseBody(resp, bodyType, messageConverters)); + return (clientResponse, webClientConfig) -> clientResponse + .doOnNext(response -> webClientConfig.getResponseErrorHandler() + .handleError(response, webClientConfig.getMessageConverters())) + .flatMap(resp -> decodeResponseBody(resp, bodyType, webClientConfig.getMessageConverters())); } /** @@ -77,22 +98,39 @@ public class ResponseExtractors { return bodyStream(bodyType); } + /** + * Extract the response body and decode it, returning it as a {@code Flux} + * @see ResolvableType#forClassWithGenerics(Class, Class[]) + */ + @SuppressWarnings("unchecked") + public static BodyExtractor> asStream(ResolvableType bodyType) { + return (clientResponse, messageConverters) -> + (Flux) decodeResponseBody(clientResponse, bodyType, messageConverters); + } + + /** + * Extract the response body and decode it, returning it as a {@code Flux} + */ + public static BodyExtractor> asStream(Class sourceClass) { + ResolvableType bodyType = ResolvableType.forClass(sourceClass); + return asStream(bodyType); + } + /** * Extract the full response body as a {@code ResponseEntity} with its body decoded as * a single type {@code T}. * @see ResolvableType#forClassWithGenerics(Class, Class[]) */ @SuppressWarnings("unchecked") - public static ResponseExtractor>> response( - ResolvableType bodyType) { - return (clientResponse, messageConverters) -> clientResponse.then(response -> { - return Mono.when( - decodeResponseBody(response, bodyType, - messageConverters).next().defaultIfEmpty( - EMPTY_BODY), - Mono.just(response.getHeaders()), - Mono.just(response.getStatusCode())); - }).map(tuple -> { + public static ResponseExtractor>> response(ResolvableType bodyType) { + + return (clientResponse, webClientConfig) -> clientResponse.then(response -> + Mono.when( + decodeResponseBodyAsMono(response, bodyType, + webClientConfig.getMessageConverters()).defaultIfEmpty(EMPTY_BODY), + Mono.just(response.getHeaders()), + Mono.just(response.getStatusCode())) + ).map(tuple -> { Object body = (tuple.getT1() != EMPTY_BODY ? tuple.getT1() : null); return new ResponseEntity<>((T) body, tuple.getT2(), tuple.getT3()); }); @@ -102,8 +140,7 @@ public class ResponseExtractors { * Extract the full response body as a {@code ResponseEntity} with its body decoded as * a single type {@code T}. */ - public static ResponseExtractor>> response( - Class bodyClass) { + public static ResponseExtractor>> response(Class bodyClass) { ResolvableType bodyType = ResolvableType.forClass(bodyClass); return response(bodyType); } @@ -113,11 +150,10 @@ public class ResponseExtractors { * a {@code Flux}. * @see ResolvableType#forClassWithGenerics(Class, Class[]) */ - public static ResponseExtractor>>> responseStream( - ResolvableType type) { - return (clientResponse, messageConverters) -> clientResponse + public static ResponseExtractor>>> responseStream(ResolvableType type) { + return (clientResponse, webClientConfig) -> clientResponse .map(response -> new ResponseEntity<>( - ResponseExtractors. decodeResponseBody(response, type, messageConverters), + decodeResponseBody(response, type, webClientConfig.getMessageConverters()), response.getHeaders(), response.getStatusCode())); } @@ -125,8 +161,7 @@ public class ResponseExtractors { * Extract the full response body as a {@code ResponseEntity} with its body decoded as * a {@code Flux}. */ - public static ResponseExtractor>>> responseStream( - Class sourceClass) { + public static ResponseExtractor>>> responseStream(Class sourceClass) { ResolvableType resolvableType = ResolvableType.forClass(sourceClass); return responseStream(resolvableType); } @@ -135,30 +170,37 @@ public class ResponseExtractors { * Extract the response headers as an {@code HttpHeaders} instance. */ public static ResponseExtractor> headers() { - return (clientResponse, messageConverters) -> clientResponse.map(resp -> resp.getHeaders()); + return (clientResponse, webClientConfig) -> clientResponse.map(resp -> resp.getHeaders()); } @SuppressWarnings("unchecked") protected static Flux decodeResponseBody(ClientHttpResponse response, - ResolvableType responseType, - List> messageConverters) { + ResolvableType responseType, List> messageConverters) { MediaType contentType = response.getHeaders().getContentType(); - Optional> converter = resolveConverter(messageConverters, - responseType, contentType); - if (!converter.isPresent()) { - return Flux.error(new IllegalStateException( - "Could not decode response body of type '" + contentType - + "' with target type '" + responseType.toString() + "'")); - } - return (Flux) converter.get().read(responseType, response); + HttpMessageConverter converter = resolveConverter(messageConverters, responseType, contentType); + return (Flux) converter.read(responseType, response); } - protected static Optional> resolveConverter( - List> messageConverters, ResolvableType type, - MediaType mediaType) { - return messageConverters.stream().filter(e -> e.canRead(type, mediaType)) - .findFirst(); + @SuppressWarnings("unchecked") + protected static Mono decodeResponseBodyAsMono(ClientHttpResponse response, + ResolvableType responseType, List> messageConverters) { + + MediaType contentType = response.getHeaders().getContentType(); + HttpMessageConverter converter = resolveConverter(messageConverters, responseType, contentType); + return (Mono) converter.readMono(responseType, response); + } + + protected static HttpMessageConverter resolveConverter( + List> messageConverters, ResolvableType responseType, MediaType contentType) { + + return messageConverters.stream() + .filter(e -> e.canRead(responseType, contentType)) + .findFirst() + .orElseThrow(() -> + new WebClientException( + "Could not decode response body of type '" + contentType + + "' with target type '" + responseType.toString() + "'")); } } diff --git a/spring-web/src/main/java/org/springframework/web/client/reactive/WebClient.java b/spring-web/src/main/java/org/springframework/web/client/reactive/WebClient.java index c68fb26c5b..b06e49de8e 100644 --- a/spring-web/src/main/java/org/springframework/web/client/reactive/WebClient.java +++ b/spring-web/src/main/java/org/springframework/web/client/reactive/WebClient.java @@ -92,7 +92,7 @@ public final class WebClient { private ClientHttpConnector clientHttpConnector; - private List> messageConverters; + private final DefaultWebClientConfig webClientConfig; /** * Create a {@code WebClient} instance, using the {@link ClientHttpConnector} @@ -111,8 +111,11 @@ public final class WebClient { */ public WebClient(ClientHttpConnector clientHttpConnector) { this.clientHttpConnector = clientHttpConnector; - this.messageConverters = new ArrayList<>(); - addDefaultHttpMessageConverters(this.messageConverters); + this.webClientConfig = new DefaultWebClientConfig(); + List> converters = new ArrayList<>(); + addDefaultHttpMessageConverters(converters); + this.webClientConfig.setMessageConverters(converters); + this.webClientConfig.setResponseErrorHandler(new DefaultResponseErrorHandler()); } /** @@ -141,7 +144,14 @@ public final class WebClient { * messages */ public void setMessageConverters(List> messageConverters) { - this.messageConverters = messageConverters; + this.webClientConfig.setMessageConverters(messageConverters); + } + + /** + * Set the {@link ResponseErrorHandler} to use for handling HTTP response errors + */ + public void setResponseErrorHandler(ResponseErrorHandler responseErrorHandler) { + this.webClientConfig.setResponseErrorHandler(responseErrorHandler); } /** @@ -167,17 +177,41 @@ public final class WebClient { return new WebResponseActions() { @Override public void doWithStatus(Consumer consumer) { - clientResponse.doOnNext(clientHttpResponse -> - consumer.accept(clientHttpResponse.getStatusCode())); + clientResponse.doOnNext(clientHttpResponse -> consumer.accept(clientHttpResponse.getStatusCode())); } @Override public T extract(ResponseExtractor extractor) { - return extractor.extract(clientResponse, messageConverters); + return extractor.extract(clientResponse, webClientConfig); } }; } + protected class DefaultWebClientConfig implements WebClientConfig { + + private List> messageConverters; + + private ResponseErrorHandler responseErrorHandler; + + @Override + public List> getMessageConverters() { + return messageConverters; + } + + public void setMessageConverters(List> messageConverters) { + this.messageConverters = messageConverters; + } + + @Override + public ResponseErrorHandler getResponseErrorHandler() { + return responseErrorHandler; + } + + public void setResponseErrorHandler(ResponseErrorHandler responseErrorHandler) { + this.responseErrorHandler = responseErrorHandler; + } + } + protected class DefaultRequestCallback implements Function> { private final ClientWebRequest clientWebRequest; @@ -198,7 +232,8 @@ public final class WebClient { .forEach(cookie -> clientHttpRequest.getCookies().add(cookie.getName(), cookie)); if (this.clientWebRequest.getBody() != null) { return writeRequestBody(this.clientWebRequest.getBody(), - this.clientWebRequest.getElementType(), clientHttpRequest, messageConverters); + this.clientWebRequest.getElementType(), + clientHttpRequest, webClientConfig.getMessageConverters()); } else { return clientHttpRequest.setComplete(); diff --git a/spring-web/src/main/java/org/springframework/web/client/reactive/WebClientConfig.java b/spring-web/src/main/java/org/springframework/web/client/reactive/WebClientConfig.java new file mode 100644 index 0000000000..9ab326b7d6 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/web/client/reactive/WebClientConfig.java @@ -0,0 +1,41 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.client.reactive; + +import java.util.List; + +import org.springframework.http.converter.reactive.HttpMessageConverter; + +/** + * Interface that makes the {@link WebClient} configuration information + * available to downstream infrastructure such as {@link ResponseErrorHandler}s. + * + * @author Brian Clozel + * @since 5.0 + */ +public interface WebClientConfig { + + /** + * Return the message converters that can help encoding/decoding the HTTP message body + */ + List> getMessageConverters(); + + /** + * Return the configured {@link ResponseErrorHandler} + */ + ResponseErrorHandler getResponseErrorHandler(); +} diff --git a/spring-web/src/main/java/org/springframework/web/client/reactive/WebClientErrorException.java b/spring-web/src/main/java/org/springframework/web/client/reactive/WebClientErrorException.java new file mode 100644 index 0000000000..637418e2bb --- /dev/null +++ b/spring-web/src/main/java/org/springframework/web/client/reactive/WebClientErrorException.java @@ -0,0 +1,45 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.client.reactive; + +import java.util.List; + +import org.springframework.http.HttpStatus; +import org.springframework.http.client.reactive.ClientHttpResponse; +import org.springframework.http.converter.reactive.HttpMessageConverter; + +/** + * Exception thrown when an HTTP 4xx is received. + * + * @author Brian Clozel + * @since 5.0 + */ +@SuppressWarnings("serial") +public class WebClientErrorException extends WebClientResponseException { + + /** + * Construct a new instance of {@code HttpClientErrorException} based on a {@link ClientHttpResponse} + * and {@link HttpMessageConverter}s to optionally help decoding the response body + * @param clientResponse the HTTP response + * @param messageConverters the message converters that may decode the HTTP response body + */ + public WebClientErrorException(ClientHttpResponse clientResponse, + List> messageConverters) { + super(clientResponse.getStatusCode().value() + " " + clientResponse.getStatusCode().getReasonPhrase(), + clientResponse, messageConverters); + } +} diff --git a/spring-web/src/main/java/org/springframework/web/client/reactive/WebClientException.java b/spring-web/src/main/java/org/springframework/web/client/reactive/WebClientException.java index bb1f366acb..c6ca477d3d 100644 --- a/spring-web/src/main/java/org/springframework/web/client/reactive/WebClientException.java +++ b/spring-web/src/main/java/org/springframework/web/client/reactive/WebClientException.java @@ -20,7 +20,7 @@ import org.springframework.core.NestedRuntimeException; /** * Base class for exceptions thrown by {@link WebClient} whenever - * it encounters client-side errors. + * it encounters errors. * * @author Brian Clozel * @since 5.0 diff --git a/spring-web/src/main/java/org/springframework/web/client/reactive/WebClientResponseException.java b/spring-web/src/main/java/org/springframework/web/client/reactive/WebClientResponseException.java new file mode 100644 index 0000000000..e273d9b93d --- /dev/null +++ b/spring-web/src/main/java/org/springframework/web/client/reactive/WebClientResponseException.java @@ -0,0 +1,78 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.client.reactive; + +import java.util.List; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.client.reactive.ClientHttpResponse; +import org.springframework.http.converter.reactive.HttpMessageConverter; + +/** + * Base class for exceptions associated with specific HTTP client response status codes. + * + * @author Brian Clozel + * @since 5.0 + */ +@SuppressWarnings("serial") +public class WebClientResponseException extends WebClientException { + + private final ClientHttpResponse clientResponse; + + private final List> messageConverters; + + /** + * Construct a new instance of {@code WebClientResponseException} with the given response data + * @param message the given error message + * @param clientResponse the HTTP response + * @param messageConverters the message converters that maay decode the HTTP response body + */ + public WebClientResponseException(String message, ClientHttpResponse clientResponse, + List> messageConverters) { + super(message); + this.clientResponse = clientResponse; + this.messageConverters = messageConverters; + } + + /** + * Return the HTTP status + */ + public HttpStatus getStatus() { + return this.clientResponse.getStatusCode(); + } + + /** + * Return the HTTP response headers + */ + public HttpHeaders getResponseHeaders() { + return this.clientResponse.getHeaders(); + } + + /** + * Perform an extraction of the response body into a higher level representation. + * + *

+	 * static imports: ResponseExtractors.*
+	 *
+	 * String responseBody = clientResponse.getResponseBody(as(String.class));
+	 * 
+ */ + public T getResponseBody(BodyExtractor extractor) { + return extractor.extract(this.clientResponse, this.messageConverters); + } +} diff --git a/spring-web/src/main/java/org/springframework/web/client/reactive/WebResponseActions.java b/spring-web/src/main/java/org/springframework/web/client/reactive/WebResponseActions.java index 2add96ebaa..4c9167c812 100644 --- a/spring-web/src/main/java/org/springframework/web/client/reactive/WebResponseActions.java +++ b/spring-web/src/main/java/org/springframework/web/client/reactive/WebResponseActions.java @@ -39,11 +39,11 @@ public interface WebResponseActions { * Perform an extraction of the response body into a higher level representation. * *
-	 * static imports: HttpRequestBuilders.*, HttpResponseExtractors.*
+	 * static imports: ClientWebRequestBuilder.*, ResponseExtractors.*
 	 *
 	 * webClient
-	 *   .perform(get(baseUrl.toString()).accept(MediaType.TEXT_PLAIN))
-	 *   .extract(response(String.class));
+	 *   .perform(get(url).accept(MediaType.TEXT_PLAIN))
+	 *   .extract(body(String.class));
 	 * 
*/ T extract(ResponseExtractor extractor); diff --git a/spring-web/src/main/java/org/springframework/web/client/reactive/WebServerErrorException.java b/spring-web/src/main/java/org/springframework/web/client/reactive/WebServerErrorException.java new file mode 100644 index 0000000000..e5f1b25b5c --- /dev/null +++ b/spring-web/src/main/java/org/springframework/web/client/reactive/WebServerErrorException.java @@ -0,0 +1,44 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.client.reactive; + +import java.util.List; + +import org.springframework.http.HttpStatus; +import org.springframework.http.client.reactive.ClientHttpResponse; +import org.springframework.http.converter.reactive.HttpMessageConverter; + +/** + * Exception thrown when an HTTP 5xx is received. + * + * @author Brian Clozel + * @since 5.0 + */ +@SuppressWarnings("serial") +public class WebServerErrorException extends WebClientResponseException { + + /** + * Construct a new instance of {@code HttpServerErrorException} based on a {@link ClientHttpResponse} + * and {@link HttpMessageConverter}s to optionally help decoding the response body + * @param clientResponse the HTTP response + * @param messageConverters the message converters that may decode the HTTP response body + */ + public WebServerErrorException(ClientHttpResponse clientResponse, List> messageConverters) { + super(clientResponse.getStatusCode().value() + " " + clientResponse.getStatusCode().getReasonPhrase(), + clientResponse, messageConverters); + } +} diff --git a/spring-web/src/main/java/org/springframework/web/client/reactive/support/RxJava1ResponseExtractors.java b/spring-web/src/main/java/org/springframework/web/client/reactive/support/RxJava1ResponseExtractors.java index 38c43c834d..842c8fd1c4 100644 --- a/spring-web/src/main/java/org/springframework/web/client/reactive/support/RxJava1ResponseExtractors.java +++ b/spring-web/src/main/java/org/springframework/web/client/reactive/support/RxJava1ResponseExtractors.java @@ -17,7 +17,6 @@ package org.springframework.web.client.reactive.support; import java.util.List; -import java.util.Optional; import reactor.adapter.RxJava1Adapter; import reactor.core.publisher.Flux; @@ -31,10 +30,12 @@ import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.http.client.reactive.ClientHttpResponse; import org.springframework.http.converter.reactive.HttpMessageConverter; +import org.springframework.web.client.reactive.BodyExtractor; import org.springframework.web.client.reactive.ResponseExtractor; +import org.springframework.web.client.reactive.WebClientException; /** - * Static factory methods for {@link ResponseExtractor} + * Static factory methods for {@link ResponseExtractor} and {@link BodyExtractor}, * based on the {@link Observable} and {@link Single} APIs. * * @author Brian Clozel @@ -42,16 +43,60 @@ import org.springframework.web.client.reactive.ResponseExtractor; */ public class RxJava1ResponseExtractors { + /** + * Extract the response body and decode it, returning it as a {@code Single}. + * @see ResolvableType#forClassWithGenerics(Class, Class[]) + */ + @SuppressWarnings("unchecked") + public static ResponseExtractor> body(ResolvableType bodyType) { + + return (clientResponse, webClientConfig) -> (Single) RxJava1Adapter + .publisherToSingle(clientResponse + .doOnNext(response -> webClientConfig.getResponseErrorHandler() + .handleError(response, webClientConfig.getMessageConverters())) + .flatMap(resp -> decodeResponseBodyAsMono(resp, bodyType, webClientConfig.getMessageConverters()))); + } + /** * Extract the response body and decode it, returning it as a {@code Single}. */ @SuppressWarnings("unchecked") public static ResponseExtractor> body(Class sourceClass) { - ResolvableType resolvableType = ResolvableType.forClass(sourceClass); - return (clientResponse, messageConverters) -> (Single) RxJava1Adapter - .publisherToSingle(clientResponse - .flatMap(resp -> decodeResponseBody(resp, resolvableType, messageConverters)).next()); + ResolvableType bodyType = ResolvableType.forClass(sourceClass); + return body(bodyType); + } + + /** + * Extract the response body and decode it, returning it as a {@code Single}. + * @see ResolvableType#forClassWithGenerics(Class, Class[]) + */ + @SuppressWarnings("unchecked") + public static BodyExtractor> as(ResolvableType bodyType) { + return (clientResponse, messageConverters) -> + (Single) RxJava1Adapter.publisherToSingle( + decodeResponseBodyAsMono(clientResponse, bodyType, messageConverters)); + } + + /** + * Extract the response body and decode it, returning it as a {@code Single} + */ + public static BodyExtractor> as(Class sourceClass) { + ResolvableType bodyType = ResolvableType.forClass(sourceClass); + return as(bodyType); + } + + /** + * Extract the response body and decode it, returning it as an {@code Observable} + * @see ResolvableType#forClassWithGenerics(Class, Class[]) + */ + public static ResponseExtractor> bodyStream(ResolvableType bodyType) { + + return (clientResponse, webClientConfig) -> RxJava1Adapter + .publisherToObservable(clientResponse + .doOnNext(response -> webClientConfig.getResponseErrorHandler() + .handleError(response, webClientConfig.getMessageConverters())) + .flatMap(resp -> decodeResponseBody(resp, bodyType, webClientConfig.getMessageConverters()))); } /** @@ -59,29 +104,56 @@ public class RxJava1ResponseExtractors { */ public static ResponseExtractor> bodyStream(Class sourceClass) { - ResolvableType resolvableType = ResolvableType.forClass(sourceClass); - return (clientResponse, messageConverters) -> RxJava1Adapter - .publisherToObservable(clientResponse - .flatMap(resp -> decodeResponseBody(resp, resolvableType, messageConverters))); + ResolvableType bodyType = ResolvableType.forClass(sourceClass); + return bodyStream(bodyType); + } + + /** + * Extract the response body and decode it, returning it as a {@code Observable}. + * @see ResolvableType#forClassWithGenerics(Class, Class[]) + */ + @SuppressWarnings("unchecked") + public static BodyExtractor> asStream(ResolvableType bodyType) { + return (clientResponse, messageConverters) -> + (Observable) RxJava1Adapter + .publisherToObservable(decodeResponseBody(clientResponse, bodyType, messageConverters)); + } + + /** + * Extract the response body and decode it, returning it as a {@code Observable}. + */ + public static BodyExtractor> asStream(Class sourceClass) { + ResolvableType bodyType = ResolvableType.forClass(sourceClass); + return asStream(bodyType); + } + + /** + * Extract the full response body as a {@code ResponseEntity} + * with its body decoded as a single type {@code T}. + * @see ResolvableType#forClassWithGenerics(Class, Class[]) + */ + @SuppressWarnings("unchecked") + public static ResponseExtractor>> response(ResolvableType bodyType) { + + return (clientResponse, webClientConfig) -> + RxJava1Adapter.publisherToSingle(clientResponse + .then(response -> + Mono.when( + decodeResponseBody(response, bodyType, webClientConfig.getMessageConverters()).next(), + Mono.just(response.getHeaders()), + Mono.just(response.getStatusCode()))) + .map(tuple -> + new ResponseEntity<>((T) tuple.getT1(), tuple.getT2(), tuple.getT3()))); } /** * Extract the full response body as a {@code ResponseEntity} * with its body decoded as a single type {@code T}. */ - @SuppressWarnings("unchecked") public static ResponseExtractor>> response(Class sourceClass) { - ResolvableType resolvableType = ResolvableType.forClass(sourceClass); - return (clientResponse, messageConverters) -> - RxJava1Adapter.publisherToSingle(clientResponse - .then(response -> - Mono.when( - decodeResponseBody(response, resolvableType, messageConverters).next(), - Mono.just(response.getHeaders()), - Mono.just(response.getStatusCode()))) - .map(tuple -> - new ResponseEntity<>((T) tuple.getT1(), tuple.getT2(), tuple.getT3()))); + ResolvableType bodyType = ResolvableType.forClass(sourceClass); + return response(bodyType); } /** @@ -90,10 +162,19 @@ public class RxJava1ResponseExtractors { */ public static ResponseExtractor>>> responseStream(Class sourceClass) { ResolvableType resolvableType = ResolvableType.forClass(sourceClass); - return (clientResponse, messageConverters) -> RxJava1Adapter.publisherToSingle( - clientResponse.map(response -> new ResponseEntity<>( - RxJava1Adapter.publisherToObservable( - RxJava1ResponseExtractors. decodeResponseBody(response, resolvableType, messageConverters)), + return responseStream(resolvableType); + } + + /** + * Extract the full response body as a {@code ResponseEntity} + * with its body decoded as an {@code Observable} + * @see ResolvableType#forClassWithGenerics(Class, Class[]) + */ + public static ResponseExtractor>>> responseStream(ResolvableType bodyType) { + return (clientResponse, webClientConfig) -> RxJava1Adapter.publisherToSingle(clientResponse + .map(response -> new ResponseEntity<>( + RxJava1Adapter + .publisherToObservable(decodeResponseBody(response, bodyType, webClientConfig.getMessageConverters())), response.getHeaders(), response.getStatusCode()))); } @@ -107,22 +188,33 @@ public class RxJava1ResponseExtractors { } @SuppressWarnings("unchecked") - protected static Flux decodeResponseBody(ClientHttpResponse response, ResolvableType responseType, - List> messageConverters) { + protected static Flux decodeResponseBody(ClientHttpResponse response, + ResolvableType responseType, List> messageConverters) { MediaType contentType = response.getHeaders().getContentType(); - Optional> converter = resolveConverter(messageConverters, responseType, contentType); - if (!converter.isPresent()) { - return Flux.error(new IllegalStateException("Could not decode response body of type '" + contentType + - "' with target type '" + responseType.toString() + "'")); - } - return (Flux) converter.get().read(responseType, response); + HttpMessageConverter converter = resolveConverter(messageConverters, responseType, contentType); + return (Flux) converter.read(responseType, response); } + @SuppressWarnings("unchecked") + protected static Mono decodeResponseBodyAsMono(ClientHttpResponse response, + ResolvableType responseType, List> messageConverters) { - protected static Optional> resolveConverter(List> messageConverters, - ResolvableType type, MediaType mediaType) { - return messageConverters.stream().filter(e -> e.canRead(type, mediaType)).findFirst(); + MediaType contentType = response.getHeaders().getContentType(); + HttpMessageConverter converter = resolveConverter(messageConverters, responseType, contentType); + return (Mono) converter.readMono(responseType, response); + } + + protected static HttpMessageConverter resolveConverter( + List> messageConverters, ResolvableType responseType, MediaType contentType) { + + return messageConverters.stream() + .filter(e -> e.canRead(responseType, contentType)) + .findFirst() + .orElseThrow(() -> + new WebClientException( + "Could not decode response body of type '" + contentType + + "' with target type '" + responseType.toString() + "'")); } } diff --git a/spring-web/src/test/java/org/springframework/http/MockHttpOutputMessage.java b/spring-web/src/test/java/org/springframework/http/MockHttpOutputMessage.java index e23d739d75..7c4bbd6606 100644 --- a/spring-web/src/test/java/org/springframework/http/MockHttpOutputMessage.java +++ b/spring-web/src/test/java/org/springframework/http/MockHttpOutputMessage.java @@ -45,7 +45,7 @@ public class MockHttpOutputMessage implements HttpOutputMessage { /** * Return a copy of the actual headers written at the time of the call to - * getBody, i.e. ignoring any further changes that may have been made to + * getResponseBody, i.e. ignoring any further changes that may have been made to * the underlying headers, e.g. via a previously obtained instance. */ public HttpHeaders getWrittenHeaders() { diff --git a/spring-web/src/test/java/org/springframework/web/client/reactive/DefaultResponseErrorHandlerTests.java b/spring-web/src/test/java/org/springframework/web/client/reactive/DefaultResponseErrorHandlerTests.java new file mode 100644 index 0000000000..e510d169ec --- /dev/null +++ b/spring-web/src/test/java/org/springframework/web/client/reactive/DefaultResponseErrorHandlerTests.java @@ -0,0 +1,99 @@ +package org.springframework.web.client.reactive; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; +import static org.mockito.BDDMockito.*; +import static org.mockito.Mockito.mock; +import static org.springframework.web.client.reactive.ResponseExtractors.*; + +import java.util.Collections; +import java.util.List; + +import org.junit.Before; +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.test.TestSubscriber; + +import org.springframework.core.codec.StringDecoder; +import org.springframework.core.codec.StringEncoder; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.client.reactive.ClientHttpResponse; +import org.springframework.http.converter.reactive.CodecHttpMessageConverter; +import org.springframework.http.converter.reactive.HttpMessageConverter; + +/** + * Unit tests for {@link DefaultResponseErrorHandler}. + * + * @author Brian Clozel + */ +public class DefaultResponseErrorHandlerTests { + + private DefaultResponseErrorHandler errorHandler; + + private ClientHttpResponse response; + + private List> messageConverters; + + @Before + public void setUp() throws Exception { + this.errorHandler = new DefaultResponseErrorHandler(); + this.response = mock(ClientHttpResponse.class); + this.messageConverters = Collections + .singletonList(new CodecHttpMessageConverter<>(new StringEncoder(), new StringDecoder())); + } + + @Test + public void noError() throws Exception { + given(this.response.getStatusCode()).willReturn(HttpStatus.OK); + this.errorHandler.handleError(this.response, this.messageConverters); + } + + @Test + public void clientError() throws Exception { + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.TEXT_PLAIN); + DataBuffer buffer = new DefaultDataBufferFactory().allocateBuffer(); + buffer.write(new String("Page Not Found").getBytes("UTF-8")); + given(this.response.getStatusCode()).willReturn(HttpStatus.NOT_FOUND); + given(this.response.getHeaders()).willReturn(headers); + given(this.response.getBody()).willReturn(Flux.just(buffer)); + try { + this.errorHandler.handleError(this.response, this.messageConverters); + fail("expected HttpClientErrorException"); + } + catch (WebClientErrorException exc) { + assertThat(exc.getMessage(), is("404 Not Found")); + assertThat(exc.getStatus(), is(HttpStatus.NOT_FOUND)); + TestSubscriber.subscribe(exc.getResponseBody(as(String.class))) + .awaitAndAssertNextValues("Page Not Found") + .assertComplete(); + } + } + + @Test + public void serverError() throws Exception { + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.TEXT_PLAIN); + DataBuffer buffer = new DefaultDataBufferFactory().allocateBuffer(); + buffer.write(new String("Internal Server Error").getBytes("UTF-8")); + given(this.response.getStatusCode()).willReturn(HttpStatus.INTERNAL_SERVER_ERROR); + given(this.response.getHeaders()).willReturn(headers); + given(this.response.getBody()).willReturn(Flux.just(buffer)); + try { + this.errorHandler.handleError(this.response, this.messageConverters); + fail("expected HttpServerErrorException"); + } + catch (WebServerErrorException exc) { + assertThat(exc.getMessage(), is("500 Internal Server Error")); + assertThat(exc.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR)); + TestSubscriber.subscribe(exc.getResponseBody(as(String.class))) + .awaitAndAssertNextValues("Internal Server Error") + .assertComplete(); + } + } + +} diff --git a/spring-web/src/test/java/org/springframework/web/client/reactive/ResponseExtractorsTests.java b/spring-web/src/test/java/org/springframework/web/client/reactive/ResponseExtractorsTests.java new file mode 100644 index 0000000000..15ea0f7fa0 --- /dev/null +++ b/spring-web/src/test/java/org/springframework/web/client/reactive/ResponseExtractorsTests.java @@ -0,0 +1,209 @@ +package org.springframework.web.client.reactive; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; +import static org.mockito.BDDMockito.eq; +import static org.mockito.BDDMockito.*; +import static org.mockito.Mockito.mock; + +import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.List; + +import org.junit.Before; +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.TestSubscriber; +import reactor.core.Exceptions; + +import org.springframework.core.codec.StringDecoder; +import org.springframework.core.codec.StringEncoder; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.http.client.reactive.ClientHttpResponse; +import org.springframework.http.codec.json.JacksonJsonDecoder; +import org.springframework.http.codec.json.JacksonJsonEncoder; +import org.springframework.http.converter.reactive.CodecHttpMessageConverter; +import org.springframework.http.converter.reactive.HttpMessageConverter; + +/** + * Unit tests for {@link ResponseExtractors}. + * + * @author Brian Clozel + */ +public class ResponseExtractorsTests { + + private HttpHeaders headers = new HttpHeaders(); + + private ClientHttpResponse response; + + private List> messageConverters; + + private WebClientConfig webClientConfig; + + private ResponseErrorHandler errorHandler; + + @Before + public void setup() throws Exception { + this.headers = new HttpHeaders(); + this.response = mock(ClientHttpResponse.class); + given(this.response.getHeaders()).willReturn(headers); + this.messageConverters = Arrays.asList( + new CodecHttpMessageConverter<>(new StringEncoder(), new StringDecoder()), + new CodecHttpMessageConverter<>(new JacksonJsonEncoder(), new JacksonJsonDecoder())); + this.webClientConfig = mock(WebClientConfig.class); + this.errorHandler = mock(ResponseErrorHandler.class); + given(this.webClientConfig.getMessageConverters()).willReturn(this.messageConverters); + given(this.webClientConfig.getResponseErrorHandler()).willReturn(this.errorHandler); + } + + @Test + public void shouldExtractResponseEntityMono() throws Exception { + this.headers.setContentType(MediaType.TEXT_PLAIN); + given(this.response.getStatusCode()).willReturn(HttpStatus.OK); + given(this.response.getBody()).willReturn(createFluxBody("test content")); + + Mono> result = ResponseExtractors.response(String.class) + .extract(Mono.just(this.response), this.webClientConfig); + + TestSubscriber.subscribe(result) + .awaitAndAssertNextValuesWith(entity -> { + assertThat(entity.getStatusCode(), is(HttpStatus.OK)); + assertThat(entity.getHeaders().getContentType(), is(MediaType.TEXT_PLAIN)); + assertThat(entity.getBody(), is("test content")); + }) + .assertComplete(); + } + + @Test + public void shouldExtractResponseEntityFlux() throws Exception { + this.headers.setContentType(MediaType.TEXT_PLAIN); + given(this.response.getStatusCode()).willReturn(HttpStatus.OK); + given(this.response.getBody()).willReturn(createFluxBody("test", " content")); + + Mono> result = ResponseExtractors.response(String.class) + .extract(Mono.just(this.response), this.webClientConfig); + + TestSubscriber.subscribe(result) + .awaitAndAssertNextValuesWith(entity -> { + assertThat(entity.getStatusCode(), is(HttpStatus.OK)); + assertThat(entity.getHeaders().getContentType(), is(MediaType.TEXT_PLAIN)); + assertThat(entity.getBody(), is("test content")); + }) + .assertComplete(); + } + + @Test + public void shouldExtractResponseEntityWithEmptyBody() throws Exception { + given(this.response.getStatusCode()).willReturn(HttpStatus.NO_CONTENT); + given(this.response.getBody()).willReturn(Flux.empty()); + + Mono> result = ResponseExtractors.response(String.class) + .extract(Mono.just(this.response), this.webClientConfig); + + TestSubscriber.subscribe(result) + .awaitAndAssertNextValuesWith(entity -> { + assertThat(entity.getStatusCode(), is(HttpStatus.NO_CONTENT)); + assertNull(entity.getBody()); + }) + .assertComplete(); + } + + @Test + public void shouldExtractResponseEntityAsStream() throws Exception { + this.headers.setContentType(MediaType.TEXT_PLAIN); + given(this.response.getStatusCode()).willReturn(HttpStatus.OK); + given(this.response.getBody()).willReturn(createFluxBody("test", " content")); + + Mono>> result = ResponseExtractors.responseStream(String.class) + .extract(Mono.just(this.response), this.webClientConfig); + + TestSubscriber.subscribe(result) + .awaitAndAssertNextValuesWith(entity -> { + assertThat(entity.getStatusCode(), is(HttpStatus.OK)); + assertThat(entity.getHeaders().getContentType(), is(MediaType.TEXT_PLAIN)); + TestSubscriber.subscribe(entity.getBody()) + .awaitAndAssertNextValues("test", " content") + .assertComplete(); + }) + .assertComplete(); + } + + @Test + public void shouldGetErrorWhenExtractingWithMissingConverter() throws Exception { + this.headers.setContentType(MediaType.APPLICATION_XML); + given(this.response.getStatusCode()).willReturn(HttpStatus.OK); + given(this.response.getBody()).willReturn(createFluxBody("test content")); + + Mono> result = ResponseExtractors.response(SomePojo.class) + .extract(Mono.just(this.response), this.webClientConfig); + + TestSubscriber.subscribe(result) + .assertErrorWith(t -> { + assertThat(t, instanceOf(WebClientException.class)); + WebClientException exc = (WebClientException) t; + assertThat(exc.getMessage(), containsString("Could not decode response body of type 'application/xml'")); + assertThat(exc.getMessage(), containsString("$SomePojo")); + }); + } + + @Test + public void shouldExtractResponseHeaders() throws Exception { + this.headers.setContentType(MediaType.TEXT_PLAIN); + this.headers.setETag("\"Spring\""); + given(this.response.getStatusCode()).willReturn(HttpStatus.OK); + + Mono result = ResponseExtractors.headers() + .extract(Mono.just(this.response), this.webClientConfig); + + TestSubscriber.subscribe(result) + .awaitAndAssertNextValuesWith(headers -> { + assertThat(headers.getContentType(), is(MediaType.TEXT_PLAIN)); + assertThat(headers.getETag(), is("\"Spring\"")); + }) + .assertComplete(); + } + + @Test + public void shouldExecuteResponseHandler() throws Exception { + this.headers.setContentType(MediaType.TEXT_PLAIN); + given(this.response.getStatusCode()).willReturn(HttpStatus.NOT_FOUND); + given(this.response.getBody()).willReturn(createFluxBody("test", " content")); + + Mono result = ResponseExtractors.body(String.class) + .extract(Mono.just(this.response), this.webClientConfig); + + TestSubscriber.subscribe(result) + .assertValueCount(1) + .assertComplete(); + + then(this.errorHandler).should().handleError(eq(this.response), eq(this.messageConverters)); + } + + + private Flux createFluxBody(String... items) throws Exception { + + DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + return Flux.just(items) + .map(item -> { + DataBuffer buffer = factory.allocateBuffer(); + try { + buffer.write(new String(item).getBytes("UTF-8")); + } + catch (UnsupportedEncodingException exc) { + Exceptions.propagate(exc); + } + return buffer; + }); + } + + protected class SomePojo { + public String foo; + } + +} diff --git a/spring-web/src/test/java/org/springframework/web/client/reactive/RxJava1WebClientIntegrationTests.java b/spring-web/src/test/java/org/springframework/web/client/reactive/RxJava1WebClientIntegrationTests.java index a974595f23..750741370e 100644 --- a/spring-web/src/test/java/org/springframework/web/client/reactive/RxJava1WebClientIntegrationTests.java +++ b/spring-web/src/test/java/org/springframework/web/client/reactive/RxJava1WebClientIntegrationTests.java @@ -28,9 +28,7 @@ import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; import org.hamcrest.Matchers; import org.junit.After; -import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import rx.Observable; import rx.Single; @@ -43,7 +41,7 @@ import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.http.codec.Pojo; /** - * {@link WebClient} integration tests with the {@code Obserable} and {@code Single} API. + * {@link WebClient} integration tests with the {@code Observable} and {@code Single} API. * * @author Brian Clozel */ @@ -80,9 +78,9 @@ public class RxJava1WebClientIntegrationTests { ts.assertCompleted(); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT)); - Assert.assertEquals("/greeting?name=Spring", request.getPath()); + assertEquals(1, server.getRequestCount()); + assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals("/greeting?name=Spring", request.getPath()); } @Test @@ -106,10 +104,10 @@ public class RxJava1WebClientIntegrationTests { ts.assertCompleted(); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("testvalue", request.getHeader("X-Test-Header")); - Assert.assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT)); - Assert.assertEquals("/greeting?name=Spring", request.getPath()); + assertEquals(1, server.getRequestCount()); + assertEquals("testvalue", request.getHeader("X-Test-Header")); + assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals("/greeting?name=Spring", request.getPath()); } @Test @@ -135,9 +133,9 @@ public class RxJava1WebClientIntegrationTests { ts.assertCompleted(); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("/greeting?name=Spring", request.getPath()); - Assert.assertEquals("text/plain", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals(1, server.getRequestCount()); + assertEquals("/greeting?name=Spring", request.getPath()); + assertEquals("text/plain", request.getHeader(HttpHeaders.ACCEPT)); } @Test @@ -163,9 +161,9 @@ public class RxJava1WebClientIntegrationTests { ts.assertCompleted(); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("/json", request.getPath()); - Assert.assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals(1, server.getRequestCount()); + assertEquals("/json", request.getPath()); + assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); } @Test @@ -190,9 +188,9 @@ public class RxJava1WebClientIntegrationTests { ts.assertCompleted(); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("/pojo", request.getPath()); - Assert.assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals(1, server.getRequestCount()); + assertEquals("/pojo", request.getPath()); + assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); } @Test @@ -217,9 +215,9 @@ public class RxJava1WebClientIntegrationTests { ts.assertCompleted(); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("/pojos", request.getPath()); - Assert.assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals(1, server.getRequestCount()); + assertEquals("/pojos", request.getPath()); + assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); } @Test @@ -245,9 +243,9 @@ public class RxJava1WebClientIntegrationTests { ts.assertCompleted(); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("/pojos", request.getPath()); - Assert.assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals(1, server.getRequestCount()); + assertEquals("/pojos", request.getPath()); + assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); } @Test @@ -275,12 +273,12 @@ public class RxJava1WebClientIntegrationTests { ts.assertCompleted(); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("/pojo/capitalize", request.getPath()); - Assert.assertEquals("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}", request.getBody().readUtf8()); - Assert.assertEquals("chunked", request.getHeader(HttpHeaders.TRANSFER_ENCODING)); - Assert.assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); - Assert.assertEquals("application/json", request.getHeader(HttpHeaders.CONTENT_TYPE)); + assertEquals(1, server.getRequestCount()); + assertEquals("/pojo/capitalize", request.getPath()); + assertEquals("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}", request.getBody().readUtf8()); + assertEquals("chunked", request.getHeader(HttpHeaders.TRANSFER_ENCODING)); + assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals("application/json", request.getHeader(HttpHeaders.CONTENT_TYPE)); } @Test @@ -304,33 +302,35 @@ public class RxJava1WebClientIntegrationTests { ts.assertCompleted(); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("/test", request.getPath()); - Assert.assertEquals("testkey=testvalue", request.getHeader(HttpHeaders.COOKIE)); + assertEquals(1, server.getRequestCount()); + assertEquals("/test", request.getPath()); + assertEquals("testkey=testvalue", request.getHeader(HttpHeaders.COOKIE)); } @Test - @Ignore public void shouldGetErrorWhen404() throws Exception { HttpUrl baseUrl = server.url("/greeting?name=Spring"); - this.server.enqueue(new MockResponse().setResponseCode(404)); + this.server.enqueue(new MockResponse().setResponseCode(404) + .setHeader("Content-Type", "text/plain").setBody("Not Found")); Single result = this.webClient .perform(get(baseUrl.toString())) .extract(body(String.class)); - // TODO: error message should be converted to a ClientException TestSubscriber ts = new TestSubscriber(); result.subscribe(ts); ts.awaitTerminalEvent(2, TimeUnit.SECONDS); - ts.assertError(WebClientException.class); + ts.assertError(WebClientErrorException.class); + WebClientErrorException exc = (WebClientErrorException) ts.getOnErrorEvents().get(0); + assertEquals(404, exc.getStatus().value()); + assertEquals(MediaType.TEXT_PLAIN, exc.getResponseHeaders().getContentType()); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT)); - Assert.assertEquals("/greeting?name=Spring", request.getPath()); + assertEquals(1, server.getRequestCount()); + assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals("/greeting?name=Spring", request.getPath()); } @After diff --git a/spring-web/src/test/java/org/springframework/web/client/reactive/WebClientIntegrationTests.java b/spring-web/src/test/java/org/springframework/web/client/reactive/WebClientIntegrationTests.java index c7d910c61a..fded9b3013 100644 --- a/spring-web/src/test/java/org/springframework/web/client/reactive/WebClientIntegrationTests.java +++ b/spring-web/src/test/java/org/springframework/web/client/reactive/WebClientIntegrationTests.java @@ -28,18 +28,17 @@ import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; import org.hamcrest.Matchers; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.TestSubscriber; -import org.springframework.http.codec.Pojo; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.http.codec.Pojo; /** * {@link WebClient} integration tests with the {@code Flux} and {@code Mono} API. @@ -71,16 +70,16 @@ public class WebClientIntegrationTests { TestSubscriber .subscribe(result) .awaitAndAssertNextValuesWith( - httpHeaders -> { - assertEquals(MediaType.TEXT_PLAIN, httpHeaders.getContentType()); - assertEquals(13L, httpHeaders.getContentLength()); - }) + httpHeaders -> { + assertEquals(MediaType.TEXT_PLAIN, httpHeaders.getContentType()); + assertEquals(13L, httpHeaders.getContentLength()); + }) .assertComplete(); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT)); - Assert.assertEquals("/greeting?name=Spring", request.getPath()); + assertEquals(1, server.getRequestCount()); + assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals("/greeting?name=Spring", request.getPath()); } @Test @@ -101,10 +100,10 @@ public class WebClientIntegrationTests { .assertComplete(); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("testvalue", request.getHeader("X-Test-Header")); - Assert.assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT)); - Assert.assertEquals("/greeting?name=Spring", request.getPath()); + assertEquals(1, server.getRequestCount()); + assertEquals("testvalue", request.getHeader("X-Test-Header")); + assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals("/greeting?name=Spring", request.getPath()); } @Test @@ -124,11 +123,11 @@ public class WebClientIntegrationTests { assertEquals(200, response.getStatusCode().value()); assertEquals(MediaType.TEXT_PLAIN, response.getHeaders().getContentType()); assertEquals("Hello Spring!", response.getBody()); - }); + }); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("/greeting?name=Spring", request.getPath()); - Assert.assertEquals("text/plain", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals(1, server.getRequestCount()); + assertEquals("/greeting?name=Spring", request.getPath()); + assertEquals("text/plain", request.getHeader(HttpHeaders.ACCEPT)); } @Test @@ -149,9 +148,9 @@ public class WebClientIntegrationTests { .awaitAndAssertNextValues(content) .assertComplete(); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("/json", request.getPath()); - Assert.assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals(1, server.getRequestCount()); + assertEquals("/json", request.getPath()); + assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); } @Test @@ -171,9 +170,9 @@ public class WebClientIntegrationTests { .awaitAndAssertNextValuesWith(p -> assertEquals("barbar", p.getBar())) .assertComplete(); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("/pojo", request.getPath()); - Assert.assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals(1, server.getRequestCount()); + assertEquals("/pojo", request.getPath()); + assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); } @Test @@ -191,14 +190,14 @@ public class WebClientIntegrationTests { TestSubscriber .subscribe(result) .awaitAndAssertNextValuesWith( - p -> assertThat(p.getBar(), Matchers.is("bar1")), - p -> assertThat(p.getBar(), Matchers.is("bar2"))) + p -> assertThat(p.getBar(), Matchers.is("bar1")), + p -> assertThat(p.getBar(), Matchers.is("bar2"))) .assertValueCount(2) .assertComplete(); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("/pojos", request.getPath()); - Assert.assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals(1, server.getRequestCount()); + assertEquals("/pojos", request.getPath()); + assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); } @Test @@ -216,15 +215,15 @@ public class WebClientIntegrationTests { TestSubscriber .subscribe(result) .awaitAndAssertNextValuesWith( - response -> { - assertEquals(200, response.getStatusCode().value()); - assertEquals(MediaType.APPLICATION_JSON, response.getHeaders().getContentType()); - }) + response -> { + assertEquals(200, response.getStatusCode().value()); + assertEquals(MediaType.APPLICATION_JSON, response.getHeaders().getContentType()); + }) .assertComplete(); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("/pojos", request.getPath()); - Assert.assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals(1, server.getRequestCount()); + assertEquals("/pojos", request.getPath()); + assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); } @Test @@ -249,12 +248,12 @@ public class WebClientIntegrationTests { .assertComplete(); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("/pojo/capitalize", request.getPath()); - Assert.assertEquals("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}", request.getBody().readUtf8()); - Assert.assertEquals("chunked", request.getHeader(HttpHeaders.TRANSFER_ENCODING)); - Assert.assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); - Assert.assertEquals("application/json", request.getHeader(HttpHeaders.CONTENT_TYPE)); + assertEquals(1, server.getRequestCount()); + assertEquals("/pojo/capitalize", request.getPath()); + assertEquals("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}", request.getBody().readUtf8()); + assertEquals("chunked", request.getHeader(HttpHeaders.TRANSFER_ENCODING)); + assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals("application/json", request.getHeader(HttpHeaders.CONTENT_TYPE)); } @Test @@ -274,16 +273,17 @@ public class WebClientIntegrationTests { .assertComplete(); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("/test", request.getPath()); - Assert.assertEquals("testkey=testvalue", request.getHeader(HttpHeaders.COOKIE)); + assertEquals(1, server.getRequestCount()); + assertEquals("/test", request.getPath()); + assertEquals("testkey=testvalue", request.getHeader(HttpHeaders.COOKIE)); } @Test public void shouldGetErrorWhen404() throws Exception { HttpUrl baseUrl = server.url("/greeting?name=Spring"); - this.server.enqueue(new MockResponse().setResponseCode(404)); + this.server.enqueue(new MockResponse().setResponseCode(404) + .setHeader("Content-Type", "text/plain").setBody("Not Found")); Mono result = this.webClient .perform(get(baseUrl.toString())) @@ -292,12 +292,50 @@ public class WebClientIntegrationTests { TestSubscriber .subscribe(result) .await() - .assertError(); + .assertErrorWith(t -> { + assertThat(t, Matchers.instanceOf(WebClientErrorException.class)); + WebClientErrorException exc = (WebClientErrorException) t; + assertEquals(404, exc.getStatus().value()); + assertEquals(MediaType.TEXT_PLAIN, exc.getResponseHeaders().getContentType()); + + Mono body = exc.getResponseBody(as(String.class)); + + TestSubscriber.subscribe(body) + .awaitAndAssertNextValues("Not Found") + .assertComplete(); + }); RecordedRequest request = server.takeRequest(); - Assert.assertEquals(1, server.getRequestCount()); - Assert.assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT)); - Assert.assertEquals("/greeting?name=Spring", request.getPath()); + assertEquals(1, server.getRequestCount()); + assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals("/greeting?name=Spring", request.getPath()); + } + + @Test + public void shouldGetErrorWhen500() throws Exception { + + HttpUrl baseUrl = server.url("/greeting?name=Spring"); + this.server.enqueue(new MockResponse().setResponseCode(500) + .setHeader("Content-Type", "text/plain").setBody("Server Error")); + + Mono result = this.webClient + .perform(get(baseUrl.toString())) + .extract(body(String.class)); + + TestSubscriber + .subscribe(result) + .await() + .assertErrorWith(t -> { + assertThat(t, Matchers.instanceOf(WebServerErrorException.class)); + WebServerErrorException exc = (WebServerErrorException) t; + assertEquals(500, exc.getStatus().value()); + assertEquals(MediaType.TEXT_PLAIN, exc.getResponseHeaders().getContentType()); + }); + + RecordedRequest request = server.takeRequest(); + assertEquals(1, server.getRequestCount()); + assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals("/greeting?name=Spring", request.getPath()); } @After