From d2fa5536dbfa1095d74d17a12441f79495d9de30 Mon Sep 17 00:00:00 2001 From: Sebastien Deleuze Date: Tue, 2 Apr 2019 15:11:00 +0200 Subject: [PATCH] Use more efficient Reactor operators Use handle/flatMapIterable instead of flatMap/flatMapMany when possible. Closes gh-22727 --- .../codec/json/AbstractJackson2Decoder.java | 12 +++++---- .../http/codec/xml/Jaxb2XmlDecoder.java | 15 +++++------ .../http/codec/xml/XmlEventDecoder.java | 25 +++++++++---------- .../function/client/DefaultWebClient.java | 4 +-- .../RequestPartMethodArgumentResolver.java | 12 ++++++--- 5 files changed, 37 insertions(+), 31 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Decoder.java b/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Decoder.java index 7cf4c885fb..1f4dcfe3bb 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Decoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Decoder.java @@ -115,7 +115,7 @@ public abstract class AbstractJackson2Decoder extends Jackson2CodecSupport imple getObjectMapper().readerWithView(jsonView).forType(javaType) : getObjectMapper().readerFor(javaType)); - return tokens.flatMap(tokenBuffer -> { + return tokens.handle((tokenBuffer, sink) -> { try { Object value = reader.readValue(tokenBuffer.asParser(getObjectMapper())); if (!Hints.isLoggingSuppressed(hints)) { @@ -124,16 +124,18 @@ public abstract class AbstractJackson2Decoder extends Jackson2CodecSupport imple return Hints.getLogPrefix(hints) + "Decoded [" + formatted + "]"; }); } - return Mono.justOrEmpty(value); + if (value != null) { + sink.next(value); + } } catch (InvalidDefinitionException ex) { - return Mono.error(new CodecException("Type definition error: " + ex.getType(), ex)); + sink.error(new CodecException("Type definition error: " + ex.getType(), ex)); } catch (JsonProcessingException ex) { - return Mono.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex)); + sink.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex)); } catch (IOException ex) { - return Mono.error(new DecodingException("I/O error while parsing input stream", ex)); + sink.error(new DecodingException("I/O error while parsing input stream", ex)); } }); } diff --git a/spring-web/src/main/java/org/springframework/http/codec/xml/Jaxb2XmlDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/xml/Jaxb2XmlDecoder.java index 0778932274..ab63d30741 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/xml/Jaxb2XmlDecoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/xml/Jaxb2XmlDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ package org.springframework.http.codec.xml; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; import java.util.function.Function; import javax.xml.XMLConstants; import javax.xml.bind.JAXBElement; @@ -35,6 +36,7 @@ import javax.xml.stream.events.XMLEvent; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.SynchronousSink; import org.springframework.core.ResolvableType; import org.springframework.core.codec.AbstractDecoder; @@ -224,11 +226,11 @@ public class Jaxb2XmlDecoder extends AbstractDecoder { * */ Flux> split(Flux xmlEventFlux, QName desiredName) { - return xmlEventFlux.flatMap(new SplitFunction(desiredName)); + return xmlEventFlux.handle(new SplitHandler(desiredName)); } - private static class SplitFunction implements Function>> { + private static class SplitHandler implements BiConsumer>> { private final QName desiredName; @@ -239,12 +241,12 @@ public class Jaxb2XmlDecoder extends AbstractDecoder { private int barrier = Integer.MAX_VALUE; - public SplitFunction(QName desiredName) { + public SplitHandler(QName desiredName) { this.desiredName = desiredName; } @Override - public Publisher> apply(XMLEvent event) { + public void accept(XMLEvent event, SynchronousSink> sink) { if (event.isStartElement()) { if (this.barrier == Integer.MAX_VALUE) { QName startElementName = event.asStartElement().getName(); @@ -264,10 +266,9 @@ public class Jaxb2XmlDecoder extends AbstractDecoder { if (this.elementDepth == this.barrier) { this.barrier = Integer.MAX_VALUE; Assert.state(this.events != null, "No XMLEvent List"); - return Mono.just(this.events); + sink.next(this.events); } } - return Mono.empty(); } } diff --git a/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java index fb54f43540..027177d7db 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java @@ -18,7 +18,6 @@ package org.springframework.http.codec.xml; import java.io.InputStream; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -33,6 +32,7 @@ import com.fasterxml.aalto.AsyncXMLStreamReader; import com.fasterxml.aalto.evt.EventAllocatorImpl; import com.fasterxml.aalto.stax.InputFactoryImpl; import org.reactivestreams.Publisher; +import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -106,18 +106,17 @@ public class XmlEventDecoder extends AbstractDecoder { } else { Mono singleBuffer = DataBufferUtils.join(flux); - return singleBuffer. - flatMapMany(dataBuffer -> { - try { - InputStream is = dataBuffer.asInputStream(); - Iterator eventReader = inputFactory.createXMLEventReader(is); - return Flux.fromIterable((Iterable) () -> eventReader) - .doFinally(t -> DataBufferUtils.release(dataBuffer)); - } - catch (XMLStreamException ex) { - return Mono.error(ex); - } - }); + return singleBuffer.flatMapIterable(dataBuffer -> { + InputStream is = dataBuffer.asInputStream(); + return () -> { + try { + return inputFactory.createXMLEventReader(is); + } + catch (XMLStreamException ex) { + throw Exceptions.propagate(ex); + } + }; + }); } } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java index bba241be2f..dd9408d732 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java @@ -441,13 +441,13 @@ class DefaultWebClient implements WebClient { @Override public Flux bodyToFlux(Class elementType) { return this.responseMono.flatMapMany(response -> - handleBody(response, response.bodyToFlux(elementType), mono -> mono.flatMapMany(Flux::error))); + handleBody(response, response.bodyToFlux(elementType), mono -> mono.handle((t, sink) -> sink.error(t)))); } @Override public Flux bodyToFlux(ParameterizedTypeReference elementType) { return this.responseMono.flatMapMany(response -> - handleBody(response, response.bodyToFlux(elementType), mono -> mono.flatMapMany(Flux::error))); + handleBody(response, response.bodyToFlux(elementType), mono -> mono.handle((t, sink) -> sink.error(t)))); } private > T handleBody(ClientResponse response, diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/RequestPartMethodArgumentResolver.java b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/RequestPartMethodArgumentResolver.java index 6682f39c20..4fc3994251 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/RequestPartMethodArgumentResolver.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/RequestPartMethodArgumentResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.web.reactive.result.method.annotation; +import java.util.Collections; import java.util.List; import reactor.core.publisher.Flux; @@ -71,12 +72,15 @@ public class RequestPartMethodArgumentResolver extends AbstractMessageReaderArgu String name = getPartName(parameter, requestPart); Flux parts = exchange.getMultipartData() - .flatMapMany(map -> { + .flatMapIterable(map -> { List list = map.get(name); if (CollectionUtils.isEmpty(list)) { - return (isRequired ? Flux.error(getMissingPartException(name, parameter)) : Flux.empty()); + if (isRequired) { + throw getMissingPartException(name, parameter); + } + return Collections.emptyList(); } - return Flux.fromIterable(list); + return list; }); if (Part.class.isAssignableFrom(parameter.getParameterType())) {