MultipartHttpMessageWriter consumes source once only
The previous fix #09f1f7 did not actually address the issue but only moved it further down, so instead of the subscribe(), it was consuming it inside the MultipartHttpMessageWriter#write method which returned this.body.then(), and then again for the actual request body writing. In this commit MultipartHttpMessageWriter#write returns Mono.empty() since we don't actually want to write the part content from there, but only want to access it as soon as it is availabele, for writing to the request body. Issue: SPR-16402
This commit is contained in:
@@ -269,13 +269,18 @@ public class MultipartHttpMessageWriter implements HttpMessageWriter<MultiValueM
|
||||
Publisher<T> bodyPublisher =
|
||||
body instanceof Publisher ? (Publisher<T>) body : Mono.just(body);
|
||||
|
||||
Mono<Void> partWritten = ((HttpMessageWriter<T>) writer.get())
|
||||
// The writer will call MultipartHttpOutputMessage#write which doesn't actually write
|
||||
// but only stores the body Flux and returns Mono.empty().
|
||||
|
||||
Mono<Void> partContentReady = ((HttpMessageWriter<T>) writer.get())
|
||||
.write(bodyPublisher, resolvableType, contentType, outputMessage, Collections.emptyMap());
|
||||
|
||||
return Flux.concat(
|
||||
Mono.just(generateBoundaryLine(boundary)),
|
||||
partWritten.thenMany(Flux.defer(outputMessage::getBody)),
|
||||
Mono.just(generateNewLine()));
|
||||
// After partContentReady, we can access the part content from MultipartHttpOutputMessage
|
||||
// and use it for writing to the actual request body
|
||||
|
||||
Flux<DataBuffer> partContent = partContentReady.thenMany(Flux.defer(outputMessage::getBody));
|
||||
|
||||
return Flux.concat(Mono.just(generateBoundaryLine(boundary)), partContent, Mono.just(generateNewLine()));
|
||||
}
|
||||
|
||||
|
||||
@@ -353,7 +358,9 @@ public class MultipartHttpMessageWriter implements HttpMessageWriter<MultiValueM
|
||||
return Mono.error(new IllegalStateException("Multiple calls to writeWith() not supported"));
|
||||
}
|
||||
this.body = Flux.just(generateHeaders()).concatWith(body);
|
||||
return this.body.then();
|
||||
|
||||
// We don't actually want to write (just save the body Flux)
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
private DataBuffer generateHeaders() {
|
||||
@@ -387,8 +394,7 @@ public class MultipartHttpMessageWriter implements HttpMessageWriter<MultiValueM
|
||||
|
||||
@Override
|
||||
public Mono<Void> setComplete() {
|
||||
return (this.body != null ? this.body.then() :
|
||||
Mono.error(new IllegalStateException("Body has not been written yet")));
|
||||
return Mono.error(new UnsupportedOperationException());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package org.springframework.http.codec.multipart;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
@@ -25,6 +26,7 @@ import org.junit.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.UnicastProcessor;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.codec.StringDecoder;
|
||||
@@ -146,9 +148,48 @@ public class MultipartHttpMessageWriterTests {
|
||||
Collections.emptyMap()).block(Duration.ZERO);
|
||||
|
||||
assertEquals("foobarbaz", value);
|
||||
}
|
||||
|
||||
@Test // SPR-16402
|
||||
public void singleSubscriberWithResource() throws IOException {
|
||||
UnicastProcessor<Resource> processor = UnicastProcessor.create();
|
||||
Resource logo = new ClassPathResource("/org/springframework/http/converter/logo.jpg");
|
||||
Mono.just(logo).subscribe(processor);
|
||||
|
||||
MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder();
|
||||
bodyBuilder.asyncPart("logo", processor, Resource.class);
|
||||
|
||||
Mono<MultiValueMap<String, HttpEntity<?>>> result = Mono.just(bodyBuilder.build());
|
||||
|
||||
MockServerHttpResponse response = new MockServerHttpResponse();
|
||||
Map<String, Object> hints = Collections.emptyMap();
|
||||
this.writer.write(result, null, MediaType.MULTIPART_FORM_DATA, response, hints).block();
|
||||
|
||||
MultiValueMap<String, Part> requestParts = parse(response, hints);
|
||||
assertEquals(1, requestParts.size());
|
||||
|
||||
Part part = requestParts.getFirst("logo");
|
||||
assertEquals("logo", part.name());
|
||||
// TODO: a Resource written as an async part doesn't have a file name in the contentDisposition
|
||||
// assertTrue(part instanceof FilePart);
|
||||
// assertEquals("logo.jpg", ((FilePart) part).filename());
|
||||
assertEquals(MediaType.IMAGE_JPEG, part.headers().getContentType());
|
||||
assertEquals(logo.getFile().length(), part.headers().getContentLength());
|
||||
}
|
||||
|
||||
@Test // SPR-16402
|
||||
public void singleSubscriberWithStrings() {
|
||||
UnicastProcessor<String> processor = UnicastProcessor.create();
|
||||
Flux.just("foo", "bar", "baz").subscribe(processor);
|
||||
|
||||
MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder();
|
||||
bodyBuilder.asyncPart("name", processor, String.class);
|
||||
|
||||
Mono<MultiValueMap<String, HttpEntity<?>>> result = Mono.just(bodyBuilder.build());
|
||||
|
||||
MockServerHttpResponse response = new MockServerHttpResponse();
|
||||
Map<String, Object> hints = Collections.emptyMap();
|
||||
this.writer.write(result, null, MediaType.MULTIPART_FORM_DATA, response, hints).block();
|
||||
}
|
||||
|
||||
private MultiValueMap<String, Part> parse(MockServerHttpResponse response, Map<String, Object> hints) {
|
||||
|
||||
Reference in New Issue
Block a user