From 68b5eedde12bb88b75a6db5491bbb87b46939bf7 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Thu, 6 Jul 2023 11:55:17 +0200 Subject: [PATCH] Change OutputStreamPublisher default chunk size This commit set the default chunk size to 1024 (from 8192). --- .../http/client/OutputStreamPublisher.java | 86 +++++++++++++++---- .../client/OutputStreamPublisherTests.java | 81 ++++++++++------- 2 files changed, 119 insertions(+), 48 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/OutputStreamPublisher.java b/spring-web/src/main/java/org/springframework/http/client/OutputStreamPublisher.java index 5869de45d6..5a678daf5c 100644 --- a/spring-web/src/main/java/org/springframework/http/client/OutputStreamPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/client/OutputStreamPublisher.java @@ -41,17 +41,23 @@ import org.springframework.util.Assert; */ final class OutputStreamPublisher implements Flow.Publisher { + private static final int DEFAULT_CHUNK_SIZE = 1024; + + private final OutputStreamHandler outputStreamHandler; private final ByteMapper byteMapper; private final Executor executor; + private final int chunkSize; - private OutputStreamPublisher(OutputStreamHandler outputStreamHandler, ByteMapper byteMapper, Executor executor) { + + private OutputStreamPublisher(OutputStreamHandler outputStreamHandler, ByteMapper byteMapper, Executor executor, int chunkSize) { this.outputStreamHandler = outputStreamHandler; this.byteMapper = byteMapper; this.executor = executor; + this.chunkSize = chunkSize; } @@ -64,16 +70,18 @@ final class OutputStreamPublisher implements Flow.Publisher { * subscription of the returned {@code Publisher}, when the first * item is * {@linkplain Flow.Subscription#request(long) requested}. - *
  • Each {@link OutputStream#write(byte[], int, int) OutputStream.write()} - * invocation that {@code outputStreamHandler} makes will result in a + *
  • {@link OutputStream#write(byte[], int, int) OutputStream.write()} + * invocations made by {@code outputStreamHandler} are buffered until they + * exceed the default chunk size of 1024, and then result in a * {@linkplain Flow.Subscriber#onNext(Object) published} item * if there is {@linkplain Flow.Subscription#request(long) demand}.
  • *
  • If there is no demand, {@code OutputStream.write()} will block * until there is.
  • *
  • If the subscription is {@linkplain Flow.Subscription#cancel() cancelled}, * {@code OutputStream.write()} will throw a {@code IOException}.
  • - *
  • {@linkplain OutputStream#close() Closing} the {@code OutputStream} - * will result in a {@linkplain Flow.Subscriber#onComplete() complete} signal.
  • + *
  • The subscription is + * {@linkplain Flow.Subscriber#onComplete() completed} when + * {@code outputStreamHandler} completes.
  • *
  • Any {@code IOException}s thrown from {@code outputStreamHandler} will * be dispatched to the {@linkplain Flow.Subscriber#onError(Throwable) Subscriber}. * @@ -91,15 +99,58 @@ final class OutputStreamPublisher implements Flow.Publisher { Assert.notNull(byteMapper, "ByteMapper must not be null"); Assert.notNull(executor, "Executor must not be null"); - return new OutputStreamPublisher<>(outputStreamHandler, byteMapper, executor); + return new OutputStreamPublisher<>(outputStreamHandler, byteMapper, executor, DEFAULT_CHUNK_SIZE); } + /** + * Creates a new {@code Publisher} based on bytes written to a + * {@code OutputStream}. The parameter {@code byteMapper} is used to map + * from written bytes to the published type. + *
      + *
    • The parameter {@code outputStreamHandler} is invoked once per + * subscription of the returned {@code Publisher}, when the first + * item is + * {@linkplain Flow.Subscription#request(long) requested}.
    • + *
    • {@link OutputStream#write(byte[], int, int) OutputStream.write()} + * invocations made by {@code outputStreamHandler} are buffered until they + * exceed {@code chunkSize}, and then result in a + * {@linkplain Flow.Subscriber#onNext(Object) published} item + * if there is {@linkplain Flow.Subscription#request(long) demand}.
    • + *
    • If there is no demand, {@code OutputStream.write()} will block + * until there is.
    • + *
    • If the subscription is {@linkplain Flow.Subscription#cancel() cancelled}, + * {@code OutputStream.write()} will throw a {@code IOException}.
    • + *
    • The subscription is + * {@linkplain Flow.Subscriber#onComplete() completed} when + * {@code outputStreamHandler} completes.
    • + *
    • Any {@code IOException}s thrown from {@code outputStreamHandler} will + * be dispatched to the {@linkplain Flow.Subscriber#onError(Throwable) Subscriber}. + *
    + * @param outputStreamHandler invoked when the first buffer is requested + * @param byteMapper maps written bytes to {@code T} + * @param executor used to invoke the {@code outputStreamHandler} + * @param the publisher type + * @return a {@code Publisher} based on bytes written by + * {@code outputStreamHandler} mapped by {@code byteMapper} + */ + public static Flow.Publisher create(OutputStreamHandler outputStreamHandler, ByteMapper byteMapper, + Executor executor, int chunkSize) { + + Assert.notNull(outputStreamHandler, "OutputStreamHandler must not be null"); + Assert.notNull(byteMapper, "ByteMapper must not be null"); + Assert.notNull(executor, "Executor must not be null"); + Assert.isTrue(chunkSize > 0, "ChunkSize must be larger than 0"); + + return new OutputStreamPublisher<>(outputStreamHandler, byteMapper, executor, chunkSize); + } + + @Override public void subscribe(Flow.Subscriber subscriber) { Objects.requireNonNull(subscriber, "Subscriber must not be null"); OutputStreamSubscription subscription = new OutputStreamSubscription<>(subscriber, this.outputStreamHandler, - this.byteMapper); + this.byteMapper, this.chunkSize); subscriber.onSubscribe(subscription); this.executor.execute(subscription::invokeHandler); } @@ -162,16 +213,18 @@ final class OutputStreamPublisher implements Flow.Publisher { static final Object READY = new Object(); - private final Flow.Subscriber actual; + private final Flow.Subscriber actual; private final OutputStreamHandler outputStreamHandler; private final ByteMapper byteMapper; + private final int chunkSize; + private final AtomicLong requested = new AtomicLong(); - private final AtomicReference parkedThreadAtomic = new AtomicReference<>(); + private final AtomicReference parkedThread = new AtomicReference<>(); @Nullable private volatile Throwable error; @@ -180,10 +233,11 @@ final class OutputStreamPublisher implements Flow.Publisher { public OutputStreamSubscription(Flow.Subscriber actual, OutputStreamHandler outputStreamHandler, - ByteMapper byteMapper) { + ByteMapper byteMapper, int chunkSize) { this.actual = actual; this.byteMapper = byteMapper; this.outputStreamHandler = outputStreamHandler; + this.chunkSize = chunkSize; } @Override @@ -248,7 +302,7 @@ final class OutputStreamPublisher implements Flow.Publisher { // use BufferedOutputStream, so that written bytes are buffered // before publishing as byte buffer - try (OutputStream outputStream = new BufferedOutputStream(this)) { + try (OutputStream outputStream = new BufferedOutputStream(this, this.chunkSize)) { this.outputStreamHandler.handle(outputStream); } catch (IOException ex) { @@ -323,7 +377,7 @@ final class OutputStreamPublisher implements Flow.Publisher { Thread toUnpark = Thread.currentThread(); while (true) { - Object current = this.parkedThreadAtomic.get(); + Object current = this.parkedThread.get(); if (current == READY) { break; } @@ -332,19 +386,19 @@ final class OutputStreamPublisher implements Flow.Publisher { throw new IllegalStateException("Only one (Virtual)Thread can await!"); } - if (this.parkedThreadAtomic.compareAndSet(null, toUnpark)) { + if (this.parkedThread.compareAndSet(null, toUnpark)) { LockSupport.park(); // we don't just break here because park() can wake up spuriously // if we got a proper resume, get() == READY and the loop will quit above } } // clear the resume indicator so that the next await call will park without a resume() - this.parkedThreadAtomic.lazySet(null); + this.parkedThread.lazySet(null); } private void resume() { - if (this.parkedThreadAtomic.get() != READY) { - Object old = this.parkedThreadAtomic.getAndSet(READY); + if (this.parkedThread.get() != READY) { + Object old = this.parkedThread.getAndSet(READY); if (old != READY) { LockSupport.unpark((Thread)old); } diff --git a/spring-web/src/test/java/org/springframework/http/client/OutputStreamPublisherTests.java b/spring-web/src/test/java/org/springframework/http/client/OutputStreamPublisherTests.java index 574ba987f0..466e518e58 100644 --- a/spring-web/src/test/java/org/springframework/http/client/OutputStreamPublisherTests.java +++ b/spring-web/src/test/java/org/springframework/http/client/OutputStreamPublisherTests.java @@ -17,7 +17,6 @@ package org.springframework.http.client; import java.io.OutputStreamWriter; -import java.io.Writer; import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -38,6 +37,13 @@ import static org.assertj.core.api.Assertions.assertThatIOException; */ class OutputStreamPublisherTests { + private static final byte[] FOO = "foo".getBytes(StandardCharsets.UTF_8); + + private static final byte[] BAR = "bar".getBytes(StandardCharsets.UTF_8); + + private static final byte[] BAZ = "baz".getBytes(StandardCharsets.UTF_8); + + private final Executor executor = Executors.newSingleThreadExecutor(); private final OutputStreamPublisher.ByteMapper byteMapper = @@ -59,11 +65,9 @@ class OutputStreamPublisherTests { @Test void basic() { Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { - try (Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) { - writer.write("foo"); - writer.write("bar"); - writer.write("baz"); - } + outputStream.write(FOO); + outputStream.write(BAR); + outputStream.write(BAZ); }, this.byteMapper, this.executor); Flux flux = toString(flowPublisher); @@ -75,14 +79,12 @@ class OutputStreamPublisherTests { @Test void flush() { Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { - try (Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) { - writer.write("foo"); - writer.flush(); - writer.write("bar"); - writer.flush(); - writer.write("baz"); - writer.flush(); - } + outputStream.write(FOO); + outputStream.flush(); + outputStream.write(BAR); + outputStream.flush(); + outputStream.write(BAZ); + outputStream.flush(); }, this.byteMapper, this.executor); Flux flux = toString(flowPublisher); @@ -93,22 +95,37 @@ class OutputStreamPublisherTests { .verifyComplete(); } + @Test + void chunkSize() { + Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { + outputStream.write(FOO); + outputStream.write(BAR); + outputStream.write(BAZ); + }, this.byteMapper, this.executor, 3); + Flux flux = toString(flowPublisher); + + StepVerifier.create(flux) + .assertNext(s -> assertThat(s).isEqualTo("foo")) + .assertNext(s -> assertThat(s).isEqualTo("bar")) + .assertNext(s -> assertThat(s).isEqualTo("baz")) + .verifyComplete(); + } + @Test void cancel() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { - try (Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) { - assertThatIOException() - .isThrownBy(() -> { - writer.write("foo"); - writer.flush(); - writer.write("bar"); - writer.flush(); - }) - .withMessage("Subscription has been terminated"); - latch.countDown(); - } + assertThatIOException() + .isThrownBy(() -> { + outputStream.write(FOO); + outputStream.flush(); + outputStream.write(BAR); + outputStream.flush(); + }) + .withMessage("Subscription has been terminated"); + latch.countDown(); + }, this.byteMapper, this.executor); Flux flux = toString(flowPublisher); @@ -125,7 +142,7 @@ class OutputStreamPublisherTests { CountDownLatch latch = new CountDownLatch(1); Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { - Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8); + OutputStreamWriter writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8); writer.write("foo"); writer.close(); assertThatIOException().isThrownBy(() -> writer.write("bar")) @@ -146,12 +163,12 @@ class OutputStreamPublisherTests { CountDownLatch latch = new CountDownLatch(1); Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { - try(Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) { - writer.write("foo"); - writer.flush(); - writer.write("foo"); - writer.flush(); - } + try (outputStream) { + outputStream.write(FOO); + outputStream.flush(); + outputStream.write(BAR); + outputStream.flush(); + } finally { latch.countDown(); }