From 3e2f58cdd27fb7572528124529b852a330cfddb1 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Wed, 30 Nov 2022 12:30:10 +0100 Subject: [PATCH] Offer restricted access to DataBuffer's ByteBuffer This commit introduces DataBuffer::readableByteBuffers and DataBuffer::writableByteBuffers, allowing restricted access to the ByteBuffer used internally by DataBuffer implementations. Closes gh-29943 --- .../core/codec/ByteBufferDecoder.java | 8 +- .../core/io/buffer/DataBuffer.java | 129 ++++++++++++++--- .../core/io/buffer/DataBufferUtils.java | 137 +++++++++++------- .../core/io/buffer/DataBufferWrapper.java | 24 ++- .../core/io/buffer/DefaultDataBuffer.java | 71 ++++++++- .../core/io/buffer/Netty5DataBuffer.java | 99 ++++++++++--- .../io/buffer/Netty5DataBufferFactory.java | 6 +- .../core/io/buffer/NettyDataBuffer.java | 81 +++++++++-- .../io/buffer/NettyDataBufferFactory.java | 14 +- .../core/io/buffer/DataBufferTests.java | 81 +++++++++-- .../core/io/buffer/DataBufferUtilsTests.java | 33 ++--- .../AbstractDataBufferAllocatingTests.java | 62 ++++++-- .../messaging/rsocket/PayloadUtils.java | 14 +- .../HttpComponentsClientHttpRequest.java | 8 +- .../client/reactive/JdkClientHttpRequest.java | 12 +- .../reactive/JettyClientHttpRequest.java | 11 +- .../http/codec/json/Jackson2Tokenizer.java | 10 +- .../http/codec/multipart/PartGenerator.java | 12 +- .../http/codec/protobuf/ProtobufDecoder.java | 11 +- .../http/codec/xml/XmlEventDecoder.java | 9 +- .../reactive/JettyHttpHandlerAdapter.java | 17 ++- .../reactive/TomcatHttpHandlerAdapter.java | 65 ++++++--- .../reactive/UndertowServerHttpResponse.java | 6 +- .../reactive/ServerHttpResponseTests.java | 16 +- .../resource/CssLinkResourceTransformer.java | 6 +- .../socket/adapter/JettyWebSocketSession.java | 36 +++-- .../adapter/StandardWebSocketSession.java | 36 +++-- .../adapter/UndertowWebSocketSession.java | 34 ++--- 28 files changed, 773 insertions(+), 275 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java index 6481ef86db..0da03706e5 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,9 +51,11 @@ public class ByteBufferDecoder extends AbstractDataBufferDecoder { public ByteBuffer decode(DataBuffer dataBuffer, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - ByteBuffer result = dataBuffer.toByteBuffer(); + int len = dataBuffer.readableByteCount(); + ByteBuffer result = ByteBuffer.allocate(len); + dataBuffer.toByteBuffer(result); if (logger.isDebugEnabled()) { - logger.debug(Hints.getLogPrefix(hints) + "Read " + dataBuffer.readableByteCount() + " bytes"); + logger.debug(Hints.getLogPrefix(hints) + "Read " + len + " bytes"); } DataBufferUtils.release(dataBuffer); return result; diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java index ec41e5876d..5d3e44f15f 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,14 +16,18 @@ package org.springframework.core.io.buffer; +import java.io.Closeable; import java.io.InputStream; import java.io.OutputStream; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; import java.nio.charset.CoderResult; import java.nio.charset.CodingErrorAction; +import java.util.Iterator; import java.util.function.IntPredicate; import org.springframework.util.Assert; @@ -265,27 +269,28 @@ public interface DataBuffer { CharsetEncoder charsetEncoder = charset.newEncoder() .onMalformedInput(CodingErrorAction.REPLACE) .onUnmappableCharacter(CodingErrorAction.REPLACE); - CharBuffer inBuffer = CharBuffer.wrap(charSequence); - int estimatedSize = (int) (inBuffer.remaining() * charsetEncoder.averageBytesPerChar()); - ByteBuffer outBuffer = ensureCapacity(estimatedSize) - .asByteBuffer(writePosition(), writableByteCount()); - while (true) { - CoderResult cr = (inBuffer.hasRemaining() ? - charsetEncoder.encode(inBuffer, outBuffer, true) : CoderResult.UNDERFLOW); - if (cr.isUnderflow()) { - cr = charsetEncoder.flush(outBuffer); + CharBuffer src = CharBuffer.wrap(charSequence); + int length = (int) (src.remaining() * charsetEncoder.maxBytesPerChar()); + ensureWritable(length); + try (ByteBufferIterator iterator = writableByteBuffers()) { + Assert.state(iterator.hasNext(), "No ByteBuffer available"); + ByteBuffer dest = iterator.next(); + int pos = dest.position(); + CoderResult cr = charsetEncoder.encode(src, dest, true); + if (!cr.isUnderflow()) { + cr.throwException(); } - if (cr.isUnderflow()) { - break; - } - if (cr.isOverflow()) { - writePosition(writePosition() + outBuffer.position()); - int maximumSize = (int) (inBuffer.remaining() * charsetEncoder.maxBytesPerChar()); - ensureCapacity(maximumSize); - outBuffer = asByteBuffer(writePosition(), writableByteCount()); + cr = charsetEncoder.flush(dest); + if (!cr.isUnderflow()) { + cr.throwException(); } + length = dest.position() - pos; } - writePosition(writePosition() + outBuffer.position()); + catch (CharacterCodingException ex) { + // should not happen, because the encoder uses action REPLACE + throw new UncheckedIOException(ex); + } + writePosition(writePosition() + length); } return this; } @@ -353,8 +358,8 @@ public interface DataBuffer { * changes in the returned buffer's {@linkplain ByteBuffer#position() position} * will not be reflected in the reading nor writing position of this data buffer. * @return this data buffer as a byte buffer - * @deprecated as of 6.0, in favor of {@link #toByteBuffer()}, which does - * not share data and returns a copy. + * @deprecated as of 6.0, in favor of {@link #toByteBuffer(ByteBuffer)}, + * {@link #readableByteBuffers()}, or {@link #writableByteBuffers()}. */ @Deprecated(since = "6.0") ByteBuffer asByteBuffer(); @@ -368,8 +373,8 @@ public interface DataBuffer { * @param length the length of the returned byte buffer * @return this data buffer as a byte buffer * @since 5.0.1 - * @deprecated as of 6.0, in favor of {@link #toByteBuffer(int, int)}, which - * does not share data and returns a copy. + * @deprecated as of 6.0, in favor of {@link #toByteBuffer(int, ByteBuffer, int, int)}, + * {@link #readableByteBuffers()}, or {@link #writableByteBuffers()}. */ @Deprecated(since = "6.0") ByteBuffer asByteBuffer(int index, int length); @@ -380,7 +385,11 @@ public interface DataBuffer { * not shared. * @return this data buffer as a byte buffer * @since 6.0 + * @see #readableByteBuffers() + * @see #writableByteBuffers() + * @deprecated as of 6.0.5, in favor of {@link #toByteBuffer(ByteBuffer)} */ + @Deprecated(since = "6.0.5") default ByteBuffer toByteBuffer() { return toByteBuffer(readPosition(), readableByteCount()); } @@ -391,9 +400,67 @@ public interface DataBuffer { * {@code ByteBuffer} is not shared. * @return this data buffer as a byte buffer * @since 6.0 + * @see #readableByteBuffers() + * @see #writableByteBuffers() + * @deprecated as of 6.0.5, in favor of + * {@link #toByteBuffer(int, ByteBuffer, int, int)} */ + @Deprecated(since = "6.0.5") ByteBuffer toByteBuffer(int index, int length); + /** + * Copies this entire data buffer into the given destination + * {@code ByteBuffer}, beginning at the current + * {@linkplain #readPosition() reading position}, and the current + * {@linkplain ByteBuffer#position() position} of destination byte buffer. + * @param dest the destination byte buffer + * @since 6.0.5 + */ + default void toByteBuffer(ByteBuffer dest) { + toByteBuffer(readPosition(), dest, dest.position(), readableByteCount()); + } + + /** + * Copies the given length from this data buffer into the given destination + * {@code ByteBuffer}, beginning at the given source position, and the + * given destination position in the destination byte buffer. + * @param srcPos the position of this data buffer from where copying should + * start + * @param dest the destination byte buffer + * @param destPos the position in {@code dest} to where copying should + * start + * @param length the amount of data to copy + * @since 6.0.5 + */ + void toByteBuffer(int srcPos, ByteBuffer dest, int destPos, int length); + + /** + * Returns a closeable iterator over each {@link ByteBuffer} in this data + * buffer that can be read. Calling this method is more efficient than + * {@link #toByteBuffer()}, as no data is copied. However, the byte buffers + * provided can only be used during the iteration. + * + *

Note that the returned iterator must be used in a + * try-with-resources clause or explicitly + * {@linkplain ByteBufferIterator#close() closed}. + * @return a closeable iterator over the readable byte buffers contained in this data buffer + * @since 6.0.5 + */ + ByteBufferIterator readableByteBuffers(); + + /** + * Returns a closeable iterator over each {@link ByteBuffer} in this data + * buffer that can be written to. The byte buffers provided can only be used + * during the iteration. + * + *

Note that the returned iterator must be used in a + * try-with-resources clause or explicitly + * {@linkplain ByteBufferIterator#close() closed}. + * @return a closeable iterator over the writable byte buffers contained in this data buffer + * @since 6.0.5 + */ + ByteBufferIterator writableByteBuffers(); + /** * Expose this buffer's data as an {@link InputStream}. Both data and read position are * shared between the returned stream and this data buffer. The underlying buffer will @@ -450,4 +517,20 @@ public interface DataBuffer { */ String toString(int index, int length, Charset charset); + + /** + * A dedicated iterator type that ensures the lifecycle of iterated + * {@link ByteBuffer} elements. This iterator must be used in a + * try-with-resources clause or explicitly {@linkplain #close() closed}. + * + * @see DataBuffer#readableByteBuffers() + * @see DataBuffer#writableByteBuffers() + */ + interface ByteBufferIterator extends Iterator, Closeable { + + @Override + void close(); + + } + } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index d255f1e281..17d1313130 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -906,13 +906,16 @@ public abstract class DataBufferUtils { @Override public void accept(SynchronousSink sink) { - ByteBuffer byteBuffer = this.dataBufferFactory.isDirect() ? - ByteBuffer.allocateDirect(this.bufferSize) : - ByteBuffer.allocate(this.bufferSize); + int read = -1; + DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); try { - if (this.channel.read(byteBuffer) >= 0) { - byteBuffer.flip(); - DataBuffer dataBuffer = this.dataBufferFactory.wrap(byteBuffer); + try (DataBuffer.ByteBufferIterator iterator = dataBuffer.writableByteBuffers()) { + Assert.state(iterator.hasNext(), "No ByteBuffer available"); + ByteBuffer byteBuffer = iterator.next(); + read = this.channel.read(byteBuffer); + } + if (read >= 0) { + dataBuffer.writePosition(read); sink.next(dataBuffer); } else { @@ -922,11 +925,16 @@ public abstract class DataBufferUtils { catch (IOException ex) { sink.error(ex); } + finally { + if (read == -1) { + release(dataBuffer); + } + } } } - private static class ReadCompletionHandler implements CompletionHandler { + private static class ReadCompletionHandler implements CompletionHandler { private final AsynchronousFileChannel channel; @@ -978,20 +986,27 @@ public abstract class DataBufferUtils { } private void read() { - ByteBuffer byteBuffer = this.dataBufferFactory.isDirect() ? - ByteBuffer.allocateDirect(this.bufferSize) : - ByteBuffer.allocate(this.bufferSize); - this.channel.read(byteBuffer, this.position.get(), byteBuffer, this); + DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); + DataBuffer.ByteBufferIterator iterator = dataBuffer.writableByteBuffers(); + Assert.state(iterator.hasNext(), "No ByteBuffer available"); + ByteBuffer byteBuffer = iterator.next(); + Attachment attachment = new Attachment(dataBuffer, iterator); + this.channel.read(byteBuffer, this.position.get(), attachment, this); } @Override - public void completed(Integer read, ByteBuffer byteBuffer) { + public void completed(Integer read, Attachment attachment) { + attachment.iterator().close(); + DataBuffer dataBuffer = attachment.dataBuffer(); + if (this.state.get().equals(State.DISPOSED)) { + release(dataBuffer); closeChannel(this.channel); return; } if (read == -1) { + release(dataBuffer); closeChannel(this.channel); this.state.set(State.DISPOSED); this.sink.complete(); @@ -999,9 +1014,7 @@ public abstract class DataBufferUtils { } this.position.addAndGet(read); - - byteBuffer.flip(); - DataBuffer dataBuffer = this.dataBufferFactory.wrap(byteBuffer); + dataBuffer.writePosition(read); this.sink.next(dataBuffer); // Stay in READING mode if there is demand @@ -1017,7 +1030,10 @@ public abstract class DataBufferUtils { } @Override - public void failed(Throwable exc, ByteBuffer byteBuffer) { + public void failed(Throwable exc, Attachment attachment) { + attachment.iterator().close(); + release(attachment.dataBuffer()); + closeChannel(this.channel); this.state.set(State.DISPOSED); this.sink.error(exc); @@ -1026,6 +1042,8 @@ public abstract class DataBufferUtils { private enum State { IDLE, READING, DISPOSED } + + private record Attachment(DataBuffer dataBuffer, DataBuffer.ByteBufferIterator iterator) {} } @@ -1048,9 +1066,11 @@ public abstract class DataBufferUtils { @Override protected void hookOnNext(DataBuffer dataBuffer) { try { - ByteBuffer byteBuffer = dataBuffer.toByteBuffer(); - while (byteBuffer.hasRemaining()) { - this.channel.write(byteBuffer); + try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) { + ByteBuffer byteBuffer = iterator.next(); + while (byteBuffer.hasRemaining()) { + this.channel.write(byteBuffer); + } } this.sink.next(dataBuffer); request(1); @@ -1080,20 +1100,20 @@ public abstract class DataBufferUtils { private static class WriteCompletionHandler extends BaseSubscriber - implements CompletionHandler { + implements CompletionHandler { private final FluxSink sink; private final AsynchronousFileChannel channel; + private final AtomicBoolean writing = new AtomicBoolean(); + private final AtomicBoolean completed = new AtomicBoolean(); private final AtomicReference error = new AtomicReference<>(); private final AtomicLong position; - private final AtomicReference dataBuffer = new AtomicReference<>(); - public WriteCompletionHandler( FluxSink sink, AsynchronousFileChannel channel, long position) { @@ -1108,19 +1128,22 @@ public abstract class DataBufferUtils { } @Override - protected void hookOnNext(DataBuffer value) { - if (!this.dataBuffer.compareAndSet(null, value)) { - throw new IllegalStateException(); + protected void hookOnNext(DataBuffer dataBuffer) { + DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers(); + if (iterator.hasNext()) { + ByteBuffer byteBuffer = iterator.next(); + long pos = this.position.get(); + Attachment attachment = new Attachment(byteBuffer, dataBuffer, iterator); + this.writing.set(true); + this.channel.write(byteBuffer, pos, attachment, this); } - ByteBuffer byteBuffer = value.toByteBuffer(); - this.channel.write(byteBuffer, this.position.get(), byteBuffer, this); } @Override protected void hookOnError(Throwable throwable) { this.error.set(throwable); - if (this.dataBuffer.get() == null) { + if (!this.writing.get()) { this.sink.error(throwable); } } @@ -1129,43 +1152,55 @@ public abstract class DataBufferUtils { protected void hookOnComplete() { this.completed.set(true); - if (this.dataBuffer.get() == null) { + if (!this.writing.get()) { this.sink.complete(); } } @Override - public void completed(Integer written, ByteBuffer byteBuffer) { - long pos = this.position.addAndGet(written); - if (byteBuffer.hasRemaining()) { - this.channel.write(byteBuffer, pos, byteBuffer, this); - return; - } - sinkDataBuffer(); + public void completed(Integer written, Attachment attachment) { + this.writing.set(false); + attachment.iterator().close(); - Throwable throwable = this.error.get(); - if (throwable != null) { - this.sink.error(throwable); + long pos = this.position.addAndGet(written); + ByteBuffer byteBuffer = attachment.byteBuffer(); + DataBuffer.ByteBufferIterator iterator = attachment.iterator(); + if (byteBuffer.hasRemaining()) { + this.writing.set(true); + this.channel.write(byteBuffer, pos, attachment, this); } - else if (this.completed.get()) { - this.sink.complete(); + else if (iterator.hasNext()) { + ByteBuffer next = iterator.next(); + this.writing.set(true); + this.channel.write(next, pos, attachment, this); } else { - request(1); + sinkDataBuffer(attachment.dataBuffer()); + + Throwable throwable = this.error.get(); + if (throwable != null) { + this.sink.error(throwable); + } + else if (this.completed.get()) { + this.sink.complete(); + } + else { + request(1); + } } } @Override - public void failed(Throwable exc, ByteBuffer byteBuffer) { - sinkDataBuffer(); + public void failed(Throwable exc, Attachment attachment) { + this.writing.set(false); + attachment.iterator().close(); + + sinkDataBuffer(attachment.dataBuffer()); this.sink.error(exc); } - private void sinkDataBuffer() { - DataBuffer dataBuffer = this.dataBuffer.get(); - Assert.state(dataBuffer != null, "DataBuffer should not be null"); + private void sinkDataBuffer(DataBuffer dataBuffer) { this.sink.next(dataBuffer); - this.dataBuffer.set(null); } @Override @@ -1173,6 +1208,10 @@ public abstract class DataBufferUtils { return Context.of(this.sink.contextView()); } + private record Attachment(ByteBuffer byteBuffer, DataBuffer dataBuffer, DataBuffer.ByteBufferIterator iterator) {} + + } + } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferWrapper.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferWrapper.java index 0ef9c42614..1de857f629 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferWrapper.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferWrapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -202,15 +202,37 @@ public class DataBufferWrapper implements DataBuffer { } @Override + @Deprecated public ByteBuffer toByteBuffer() { return this.delegate.toByteBuffer(); } @Override + @Deprecated public ByteBuffer toByteBuffer(int index, int length) { return this.delegate.toByteBuffer(index, length); } + @Override + public void toByteBuffer(ByteBuffer dest) { + this.delegate.toByteBuffer(dest); + } + + @Override + public void toByteBuffer(int srcPos, ByteBuffer dest, int destPos, int length) { + this.delegate.toByteBuffer(srcPos, dest, destPos, length); + } + + @Override + public ByteBufferIterator readableByteBuffers() { + return this.delegate.readableByteBuffers(); + } + + @Override + public ByteBufferIterator writableByteBuffers() { + return this.delegate.writableByteBuffers(); + } + @Override public InputStream asInputStream() { return this.delegate.asInputStream(); diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java index 23bdbd0cf8..c60583a573 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ package org.springframework.core.io.buffer; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Arrays; +import java.util.NoSuchElementException; import java.util.function.IntPredicate; import org.springframework.lang.Nullable; @@ -304,9 +305,14 @@ public class DefaultDataBuffer implements DataBuffer { } @Override - public DefaultDataBuffer write(DataBuffer... buffers) { - if (!ObjectUtils.isEmpty(buffers)) { - write(Arrays.stream(buffers).map(DataBuffer::toByteBuffer).toArray(ByteBuffer[]::new)); + public DefaultDataBuffer write(DataBuffer... dataBuffers) { + if (!ObjectUtils.isEmpty(dataBuffers)) { + ByteBuffer[] byteBuffers = new ByteBuffer[dataBuffers.length]; + for (int i = 0; i < dataBuffers.length; i++) { + byteBuffers[i] = ByteBuffer.allocate(dataBuffers[i].readableByteCount()); + dataBuffers[i].toByteBuffer(byteBuffers[i]); + } + write(byteBuffers); } return this; } @@ -388,6 +394,7 @@ public class DefaultDataBuffer implements DataBuffer { } @Override + @Deprecated public ByteBuffer toByteBuffer(int index, int length) { checkIndex(index, length); @@ -398,6 +405,29 @@ public class DefaultDataBuffer implements DataBuffer { return copy.flip(); } + @Override + public void toByteBuffer(int srcPos, ByteBuffer dest, int destPos, int length) { + checkIndex(srcPos, length); + Assert.notNull(dest, "Dest must not be null"); + + dest = dest.duplicate().clear(); + dest.put(destPos, this.byteBuffer, srcPos, length); + } + + @Override + public DataBuffer.ByteBufferIterator readableByteBuffers() { + ByteBuffer readOnly = this.byteBuffer.asReadOnlyBuffer(); + readOnly.clear().position(this.readPosition).limit(this.writePosition - this.readPosition); + return new ByteBufferIterator(readOnly); + } + + @Override + public DataBuffer.ByteBufferIterator writableByteBuffers() { + ByteBuffer duplicate = this.byteBuffer.duplicate(); + duplicate.clear().position(this.writePosition).limit(this.capacity - this.writePosition); + return new ByteBufferIterator(duplicate); + } + @Override public String toString(int index, int length, Charset charset) { checkIndex(index, length); @@ -512,4 +542,37 @@ public class DefaultDataBuffer implements DataBuffer { } } + + private static final class ByteBufferIterator implements DataBuffer.ByteBufferIterator { + + private final ByteBuffer buffer; + + private boolean hasNext = true; + + + public ByteBufferIterator(ByteBuffer buffer) { + this.buffer = buffer; + } + + @Override + public boolean hasNext() { + return this.hasNext; + } + + @Override + public ByteBuffer next() { + if (!this.hasNext) { + throw new NoSuchElementException(); + } + else { + this.hasNext = false; + return this.buffer; + } + } + + @Override + public void close() { + } + } + } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBuffer.java index 6d79ef1667..2514ee6afb 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBuffer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,11 +18,12 @@ package org.springframework.core.io.buffer; import java.nio.ByteBuffer; import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; +import java.util.NoSuchElementException; import java.util.function.IntPredicate; import io.netty5.buffer.Buffer; -import io.netty5.util.AsciiString; +import io.netty5.buffer.BufferComponent; +import io.netty5.buffer.ComponentIterator; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -188,20 +189,20 @@ public final class Netty5DataBuffer implements CloseableDataBuffer, TouchableDat } @Override - public Netty5DataBuffer write(DataBuffer... buffers) { - if (!ObjectUtils.isEmpty(buffers)) { - if (hasNetty5DataBuffers(buffers)) { - Buffer[] nativeBuffers = new Buffer[buffers.length]; - for (int i = 0; i < buffers.length; i++) { - nativeBuffers[i] = ((Netty5DataBuffer) buffers[i]).getNativeBuffer(); + public Netty5DataBuffer write(DataBuffer... dataBuffers) { + if (!ObjectUtils.isEmpty(dataBuffers)) { + if (hasNetty5DataBuffers(dataBuffers)) { + Buffer[] nativeBuffers = new Buffer[dataBuffers.length]; + for (int i = 0; i < dataBuffers.length; i++) { + nativeBuffers[i] = ((Netty5DataBuffer) dataBuffers[i]).getNativeBuffer(); } return write(nativeBuffers); } else { - ByteBuffer[] byteBuffers = new ByteBuffer[buffers.length]; - for (int i = 0; i < buffers.length; i++) { - byteBuffers[i] = buffers[i].toByteBuffer(); - + ByteBuffer[] byteBuffers = new ByteBuffer[dataBuffers.length]; + for (int i = 0; i < dataBuffers.length; i++) { + byteBuffers[i] = ByteBuffer.allocate(dataBuffers[i].readableByteCount()); + dataBuffers[i].toByteBuffer(byteBuffers[i]); } return write(byteBuffers); } @@ -248,13 +249,7 @@ public final class Netty5DataBuffer implements CloseableDataBuffer, TouchableDat Assert.notNull(charSequence, "CharSequence must not be null"); Assert.notNull(charset, "Charset must not be null"); - if (StandardCharsets.US_ASCII.equals(charset) && charSequence instanceof AsciiString asciiString) { - this.buffer.writeBytes(asciiString.array(), asciiString.arrayOffset(), asciiString.length()); - } - else { - byte[] bytes = charSequence.toString().getBytes(charset); - this.buffer.writeBytes(bytes); - } + this.buffer.writeCharSequence(charSequence, charset); return this; } @@ -300,6 +295,21 @@ public final class Netty5DataBuffer implements CloseableDataBuffer, TouchableDat return copy; } + @Override + public void toByteBuffer(int srcPos, ByteBuffer dest, int destPos, int length) { + this.buffer.copyInto(srcPos, dest, destPos, length); + } + + @Override + public ByteBufferIterator readableByteBuffers() { + return new BufferComponentIterator<>(this.buffer.forEachComponent(), true); + } + + @Override + public ByteBufferIterator writableByteBuffers() { + return new BufferComponentIterator<>(this.buffer.forEachComponent(), false); + } + @Override public String toString(Charset charset) { Assert.notNull(charset, "Charset must not be null"); @@ -342,4 +352,53 @@ public final class Netty5DataBuffer implements CloseableDataBuffer, TouchableDat return this.buffer.toString(); } + + private static final class BufferComponentIterator + implements ByteBufferIterator { + + private final ComponentIterator delegate; + + private final boolean readable; + + @Nullable + private T next; + + + public BufferComponentIterator(ComponentIterator delegate, boolean readable) { + Assert.notNull(delegate, "Delegate must not be null"); + this.delegate = delegate; + this.readable = readable; + this.next = readable ? this.delegate.firstReadable() : this.delegate.firstWritable(); + } + + @Override + public boolean hasNext() { + return this.next != null; + } + + @Override + public ByteBuffer next() { + if (this.next != null) { + ByteBuffer result; + if (this.readable) { + result = this.next.readableBuffer(); + this.next = this.next.nextReadable(); + } + else { + result = this.next.writableBuffer(); + this.next = this.next.nextWritable(); + } + return result; + } + else { + throw new NoSuchElementException(); + } + } + + @Override + public void close() { + this.delegate.close(); + } + } + } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBufferFactory.java b/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBufferFactory.java index 9095d4f7ad..7163274b61 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBufferFactory.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBufferFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -127,7 +127,9 @@ public class Netty5DataBufferFactory implements DataBufferFactory { return netty5DataBuffer.getNativeBuffer(); } else { - return DefaultBufferAllocators.preferredAllocator().copyOf(buffer.toByteBuffer()); + ByteBuffer byteBuffer = ByteBuffer.allocate(buffer.readableByteCount()); + buffer.toByteBuffer(byteBuffer); + return DefaultBufferAllocators.preferredAllocator().copyOf(byteBuffer); } } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java index 6593e6d976..9e35a3591d 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ package org.springframework.core.io.buffer; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.util.NoSuchElementException; import java.util.function.IntPredicate; import io.netty.buffer.ByteBuf; @@ -182,19 +183,20 @@ public class NettyDataBuffer implements PooledDataBuffer { } @Override - public NettyDataBuffer write(DataBuffer... buffers) { - if (!ObjectUtils.isEmpty(buffers)) { - if (hasNettyDataBuffers(buffers)) { - ByteBuf[] nativeBuffers = new ByteBuf[buffers.length]; - for (int i = 0; i < buffers.length; i++) { - nativeBuffers[i] = ((NettyDataBuffer) buffers[i]).getNativeBuffer(); + public NettyDataBuffer write(DataBuffer... dataBuffers) { + if (!ObjectUtils.isEmpty(dataBuffers)) { + if (hasNettyDataBuffers(dataBuffers)) { + ByteBuf[] nativeBuffers = new ByteBuf[dataBuffers.length]; + for (int i = 0; i < dataBuffers.length; i++) { + nativeBuffers[i] = ((NettyDataBuffer) dataBuffers[i]).getNativeBuffer(); } write(nativeBuffers); } else { - ByteBuffer[] byteBuffers = new ByteBuffer[buffers.length]; - for (int i = 0; i < buffers.length; i++) { - byteBuffers[i] = buffers[i].toByteBuffer(); + ByteBuffer[] byteBuffers = new ByteBuffer[dataBuffers.length]; + for (int i = 0; i < dataBuffers.length; i++) { + byteBuffers[i] = ByteBuffer.allocate(dataBuffers[i].readableByteCount()); + dataBuffers[i].toByteBuffer(byteBuffers[i]); } write(byteBuffers); } @@ -295,6 +297,7 @@ public class NettyDataBuffer implements PooledDataBuffer { } @Override + @Deprecated public ByteBuffer toByteBuffer(int index, int length) { ByteBuffer result = this.byteBuf.isDirect() ? ByteBuffer.allocateDirect(length) : @@ -305,6 +308,26 @@ public class NettyDataBuffer implements PooledDataBuffer { return result.flip(); } + @Override + public void toByteBuffer(int srcPos, ByteBuffer dest, int destPos, int length) { + Assert.notNull(dest, "Dest must not be null"); + + dest = dest.duplicate().clear(); + dest.put(destPos, this.byteBuf.nioBuffer(), srcPos, length); + } + + @Override + public DataBuffer.ByteBufferIterator readableByteBuffers() { + ByteBuffer[] readable = this.byteBuf.nioBuffers(this.byteBuf.readerIndex(), this.byteBuf.readableBytes()); + return new ByteBufferIterator(readable, true); + } + + @Override + public DataBuffer.ByteBufferIterator writableByteBuffers() { + ByteBuffer[] writable = this.byteBuf.nioBuffers(this.byteBuf.writerIndex(), this.byteBuf.writableBytes()); + return new ByteBufferIterator(writable, false); + } + @Override public String toString(Charset charset) { Assert.notNull(charset, "Charset must not be null"); @@ -355,4 +378,42 @@ public class NettyDataBuffer implements PooledDataBuffer { return this.byteBuf.toString(); } + + private static final class ByteBufferIterator implements DataBuffer.ByteBufferIterator { + + private final ByteBuffer[] byteBuffers; + + private final boolean readOnly; + + private int cursor = 0; + + + public ByteBufferIterator(ByteBuffer[] byteBuffers, boolean readOnly) { + this.byteBuffers = byteBuffers; + this.readOnly = readOnly; + } + + @Override + public boolean hasNext() { + return this.cursor < this.byteBuffers.length; + } + + @Override + public ByteBuffer next() { + int index = this.cursor; + if (index < this.byteBuffers.length) { + this.cursor = index + 1; + ByteBuffer next = this.byteBuffers[index]; + return this.readOnly ? next.asReadOnlyBuffer() : next; + } + else { + throw new NoSuchElementException(); + } + } + + @Override + public void close() { + } + } + } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java index 885d8f6f9c..40082b4371 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -122,17 +122,19 @@ public class NettyDataBufferFactory implements DataBufferFactory { /** * Return the given Netty {@link DataBuffer} as a {@link ByteBuf}. *

Returns the {@linkplain NettyDataBuffer#getNativeBuffer() native buffer} - * if {@code buffer} is a {@link NettyDataBuffer}; returns + * if {@code dataBuffer} is a {@link NettyDataBuffer}; returns * {@link Unpooled#wrappedBuffer(ByteBuffer)} otherwise. - * @param buffer the {@code DataBuffer} to return a {@code ByteBuf} for + * @param dataBuffer the {@code DataBuffer} to return a {@code ByteBuf} for * @return the netty {@code ByteBuf} */ - public static ByteBuf toByteBuf(DataBuffer buffer) { - if (buffer instanceof NettyDataBuffer nettyDataBuffer) { + public static ByteBuf toByteBuf(DataBuffer dataBuffer) { + if (dataBuffer instanceof NettyDataBuffer nettyDataBuffer) { return nettyDataBuffer.getNativeBuffer(); } else { - return Unpooled.wrappedBuffer(buffer.toByteBuffer()); + ByteBuffer byteBuffer = ByteBuffer.allocate(dataBuffer.readableByteCount()); + dataBuffer.toByteBuffer(byteBuffer); + return Unpooled.wrappedBuffer(byteBuffer); } } diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java index 6cd9c4bc9b..2c542b202f 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.core.io.buffer; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -649,6 +650,71 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { release(buffer); } + @ParameterizedDataBufferAllocatingTest + void toByteBufferDestination(DataBufferFactory bufferFactory) { + super.bufferFactory = bufferFactory; + + DataBuffer buffer = createDataBuffer(4); + buffer.write(new byte[]{'a', 'b', 'c'}); + + ByteBuffer byteBuffer = createByteBuffer(2); + buffer.toByteBuffer(1, byteBuffer, 0, 2); + assertThat(byteBuffer.capacity()).isEqualTo(2); + assertThat(byteBuffer.remaining()).isEqualTo(2); + + byte[] resultBytes = new byte[2]; + byteBuffer.get(resultBytes); + assertThat(resultBytes).isEqualTo(new byte[]{'b', 'c'}); + + assertThatExceptionOfType(IndexOutOfBoundsException.class) + .isThrownBy(() -> buffer.toByteBuffer(0, byteBuffer, 0, 3)); + + release(buffer); + } + + @ParameterizedDataBufferAllocatingTest + void readableByteBuffers(DataBufferFactory bufferFactory) throws IOException { + super.bufferFactory = bufferFactory; + + DataBuffer dataBuffer = this.bufferFactory.join(Arrays.asList(stringBuffer("a"), + stringBuffer("b"), stringBuffer("c"))); + + byte[] result = new byte[3]; + try (var iterator = dataBuffer.readableByteBuffers()) { + assertThat(iterator).hasNext(); + int i = 0; + while (iterator.hasNext()) { + ByteBuffer byteBuffer = iterator.next(); + int len = byteBuffer.remaining(); + byteBuffer.get(result, i, len); + i += len; + assertThatException().isThrownBy(() -> byteBuffer.put((byte) 'd')); + } + } + + assertThat(result).containsExactly('a', 'b', 'c'); + + release(dataBuffer); + } + + @ParameterizedDataBufferAllocatingTest + void writableByteBuffers(DataBufferFactory bufferFactory) { + super.bufferFactory = bufferFactory; + + DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(1); + + try (DataBuffer.ByteBufferIterator iterator = dataBuffer.writableByteBuffers()) { + assertThat(iterator).hasNext(); + ByteBuffer byteBuffer = iterator.next(); + byteBuffer.put((byte) 'a'); + dataBuffer.writePosition(1); + + assertThat(iterator).isExhausted(); + } + assertThat(dataBuffer.read()).isEqualTo((byte) 'a'); + + release(dataBuffer); + } @ParameterizedDataBufferAllocatingTest void indexOf(DataBufferFactory bufferFactory) { @@ -738,6 +804,9 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { @ParameterizedDataBufferAllocatingTest @SuppressWarnings("deprecation") void retainedSlice(DataBufferFactory bufferFactory) { + assumeFalse(bufferFactory instanceof Netty5DataBufferFactory, + "Netty 5 does not support retainedSlice"); + super.bufferFactory = bufferFactory; DataBuffer buffer = createDataBuffer(3); @@ -757,12 +826,7 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { result = new byte[2]; slice.read(result); - if (!(bufferFactory instanceof Netty5DataBufferFactory)) { - assertThat(result).isEqualTo(new byte[]{'b', 'c'}); - } - else { - assertThat(result).isEqualTo(new byte[]{'b', 0}); - } + assertThat(result).isEqualTo(new byte[]{'b', 'c'}); release(buffer, slice); } @@ -822,7 +886,6 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { assertThat(bytes).isEqualTo(new byte[]{'b', 'c'}); - DataBuffer buffer2 = createDataBuffer(1); buffer2.write(new byte[]{'a'}); DataBuffer split2 = buffer2.split(1); @@ -853,7 +916,7 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { byte[] bytes = new byte[3]; composite.read(bytes); - assertThat(bytes).isEqualTo(new byte[] {'a','b','c'}); + assertThat(bytes).isEqualTo(new byte[]{'a', 'b', 'c'}); release(composite); } diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index 859732af8d..17c355f2d8 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -54,7 +54,6 @@ import org.springframework.core.testfixture.io.buffer.AbstractDataBufferAllocati import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.isA; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willAnswer; import static org.mockito.Mockito.mock; @@ -115,7 +114,7 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests { DataBufferUtils.readByteChannel(() -> channel, super.bufferFactory, 3); StepVerifier.create(result) - .consumeNextWith(stringConsumer("")) + .consumeNextWith(stringConsumer("foo")) .expectError(IOException.class) .verify(Duration.ofSeconds(3)); } @@ -172,18 +171,19 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests { byteBuffer.put("foo".getBytes(StandardCharsets.UTF_8)); long pos = invocation.getArgument(1); assertThat(pos).isEqualTo(0); - CompletionHandler completionHandler = invocation.getArgument(3); - completionHandler.completed(3, byteBuffer); + Object attachment = invocation.getArgument(2); + CompletionHandler completionHandler = invocation.getArgument(3); + completionHandler.completed(3, attachment); return null; }).willAnswer(invocation -> { - ByteBuffer byteBuffer = invocation.getArgument(0); - CompletionHandler completionHandler = invocation.getArgument(3); - completionHandler.failed(new IOException(), byteBuffer); + Object attachment = invocation.getArgument(2); + CompletionHandler completionHandler = invocation.getArgument(3); + completionHandler.failed(new IOException(), attachment); return null; }) .given(channel).read(any(), anyLong(), any(), any()); - Flux result = + Flux result= DataBufferUtils.readAsynchronousFileChannel(() -> channel, super.bufferFactory, 3); StepVerifier.create(result) @@ -474,24 +474,21 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests { willAnswer(invocation -> { ByteBuffer buffer = invocation.getArgument(0); long pos = invocation.getArgument(1); - CompletionHandler completionHandler = invocation.getArgument(3); - assertThat(pos).isEqualTo(0); - + Object attachment = invocation.getArgument(2); + CompletionHandler completionHandler = invocation.getArgument(3); int written = buffer.remaining(); buffer.position(buffer.limit()); - completionHandler.completed(written, buffer); - + completionHandler.completed(written, attachment); return null; }) .willAnswer(invocation -> { - ByteBuffer buffer = invocation.getArgument(0); - CompletionHandler completionHandler = - invocation.getArgument(3); - completionHandler.failed(new IOException(), buffer); + Object attachment = invocation.getArgument(2); + CompletionHandler completionHandler = invocation.getArgument(3); + completionHandler.failed(new IOException(), attachment); return null; }) - .given(channel).write(isA(ByteBuffer.class), anyLong(), isA(ByteBuffer.class), isA(CompletionHandler.class)); + .given(channel).write(any(), anyLong(), any(), any()); Flux writeResult = DataBufferUtils.write(flux, channel); StepVerifier.create(writeResult) diff --git a/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/AbstractDataBufferAllocatingTests.java b/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/AbstractDataBufferAllocatingTests.java index 15eebbd5f8..fe37e04f88 100644 --- a/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/AbstractDataBufferAllocatingTests.java +++ b/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/AbstractDataBufferAllocatingTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,8 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.PooledByteBufAllocatorMetric; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty5.buffer.BufferAllocator; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; @@ -64,6 +66,23 @@ import static org.junit.jupiter.params.provider.Arguments.arguments; */ public abstract class AbstractDataBufferAllocatingTests { + private static BufferAllocator netty5OnHeapUnpooled; + + private static BufferAllocator netty5OffHeapUnpooled; + + private static BufferAllocator netty5OffHeapPooled; + + private static BufferAllocator netty5OnHeapPooled; + + private static UnpooledByteBufAllocator netty4OffHeapUnpooled; + + private static UnpooledByteBufAllocator netty4OnHeapUnpooled; + + private static PooledByteBufAllocator netty4OffHeapPooled; + + private static PooledByteBufAllocator netty4OnHeapPooled; + + @RegisterExtension AfterEachCallback leakDetector = context -> waitForDataBufferRelease(Duration.ofSeconds(2)); @@ -153,6 +172,28 @@ public abstract class AbstractDataBufferAllocatingTests { return metrics.stream().mapToLong(PoolArenaMetric::numActiveAllocations).sum(); } + @BeforeAll + @SuppressWarnings("deprecation") // PooledByteBufAllocator no longer supports tinyCacheSize. + public static void createAllocators() { + netty4OnHeapUnpooled = new UnpooledByteBufAllocator(false); + netty4OffHeapUnpooled = new UnpooledByteBufAllocator(true); + netty4OnHeapPooled = new PooledByteBufAllocator(false, 1, 1, 4096, 4, 0, 0, 0, true); + netty4OffHeapPooled = new PooledByteBufAllocator(true, 1, 1, 4096, 4, 0, 0, 0, true); + + netty5OnHeapUnpooled = BufferAllocator.onHeapUnpooled(); + netty5OffHeapUnpooled = BufferAllocator.offHeapUnpooled(); + netty5OnHeapPooled = BufferAllocator.onHeapPooled(); + netty5OffHeapPooled = BufferAllocator.offHeapPooled(); + } + + @AfterAll + public static void closeAllocators() { + netty5OnHeapUnpooled.close(); + netty5OffHeapUnpooled.close(); + netty5OnHeapPooled.close(); + netty5OffHeapPooled.close(); + } + @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) @@ -161,29 +202,26 @@ public abstract class AbstractDataBufferAllocatingTests { public @interface ParameterizedDataBufferAllocatingTest { } - @SuppressWarnings("deprecation") // PooledByteBufAllocator no longer supports tinyCacheSize. public static Stream dataBufferFactories() { return Stream.of( // Netty 4 arguments(named("NettyDataBufferFactory - UnpooledByteBufAllocator - preferDirect = true", - new NettyDataBufferFactory(new UnpooledByteBufAllocator(true)))), + new NettyDataBufferFactory(netty4OffHeapUnpooled))), arguments(named("NettyDataBufferFactory - UnpooledByteBufAllocator - preferDirect = false", - new NettyDataBufferFactory(new UnpooledByteBufAllocator(false)))), - // 1) Disable caching for reliable leak detection, see https://github.com/netty/netty/issues/5275 - // 2) maxOrder is 4 (vs default 11) but can be increased if necessary + new NettyDataBufferFactory(netty4OnHeapUnpooled))), arguments(named("NettyDataBufferFactory - PooledByteBufAllocator - preferDirect = true", - new NettyDataBufferFactory(new PooledByteBufAllocator(true, 1, 1, 4096, 4, 0, 0, 0, true)))), + new NettyDataBufferFactory(netty4OffHeapPooled))), arguments(named("NettyDataBufferFactory - PooledByteBufAllocator - preferDirect = false", - new NettyDataBufferFactory(new PooledByteBufAllocator(false, 1, 1, 4096, 4, 0, 0, 0, true)))), + new NettyDataBufferFactory(netty4OnHeapPooled))), // Netty 5 arguments(named("Netty5DataBufferFactory - BufferAllocator.onHeapUnpooled()", - new Netty5DataBufferFactory(BufferAllocator.onHeapUnpooled()))), + new Netty5DataBufferFactory(netty5OnHeapUnpooled))), arguments(named("Netty5DataBufferFactory - BufferAllocator.offHeapUnpooled()", - new Netty5DataBufferFactory(BufferAllocator.offHeapUnpooled()))), + new Netty5DataBufferFactory(netty5OffHeapUnpooled))), arguments(named("Netty5DataBufferFactory - BufferAllocator.onHeapPooled()", - new Netty5DataBufferFactory(BufferAllocator.onHeapPooled()))), + new Netty5DataBufferFactory(netty5OnHeapPooled))), arguments(named("Netty5DataBufferFactory - BufferAllocator.offHeapPooled()", - new Netty5DataBufferFactory(BufferAllocator.offHeapPooled()))), + new Netty5DataBufferFactory(netty5OffHeapPooled))), // Default arguments(named("DefaultDataBufferFactory - preferDirect = true", new DefaultDataBufferFactory(true))), diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java index 1591e47b12..2793bff567 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -99,9 +99,15 @@ public abstract class PayloadUtils { return NettyDataBufferFactory.toByteBuf(buffer); } - private static ByteBuffer asByteBuffer(DataBuffer buffer) { - return buffer instanceof DefaultDataBuffer ? - ((DefaultDataBuffer) buffer).getNativeBuffer() : buffer.toByteBuffer(); + private static ByteBuffer asByteBuffer(DataBuffer dataBuffer) { + if (dataBuffer instanceof DefaultDataBuffer defaultDataBuffer) { + return defaultDataBuffer.getNativeBuffer(); + } + else { + ByteBuffer byteBuffer = ByteBuffer.allocate(dataBuffer.readableByteCount()); + dataBuffer.toByteBuffer(byteBuffer); + return byteBuffer; + } } } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java index 1b944bebca..9a87c252ab 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -102,7 +102,11 @@ class HttpComponentsClientHttpRequest extends AbstractClientHttpRequest { @Override public Mono writeWith(Publisher body) { return doCommit(() -> { - this.byteBufferFlux = Flux.from(body).map(DataBuffer::toByteBuffer); + this.byteBufferFlux = Flux.from(body).map(dataBuffer -> { + ByteBuffer byteBuffer = ByteBuffer.allocate(dataBuffer.readableByteCount()); + dataBuffer.toByteBuffer(byteBuffer); + return byteBuffer; + }); return Mono.empty(); }); } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java index afe8355317..faf65dc9ed 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -122,8 +122,8 @@ class JdkClientHttpRequest extends AbstractClientHttpRequest { private HttpRequest.BodyPublisher toBodyPublisher(Publisher body) { Publisher byteBufferBody = (body instanceof Mono ? - Mono.from(body).map(DataBuffer::toByteBuffer) : - Flux.from(body).map(DataBuffer::toByteBuffer)); + Mono.from(body).map(this::toByteBuffer) : + Flux.from(body).map(this::toByteBuffer)); Flow.Publisher bodyFlow = JdkFlowAdapter.publisherToFlowPublisher(byteBufferBody); @@ -132,6 +132,12 @@ class JdkClientHttpRequest extends AbstractClientHttpRequest { HttpRequest.BodyPublishers.fromPublisher(bodyFlow)); } + private ByteBuffer toByteBuffer(DataBuffer dataBuffer) { + ByteBuffer byteBuffer = ByteBuffer.allocate(dataBuffer.readableByteCount()); + dataBuffer.toByteBuffer(byteBuffer); + return byteBuffer; + } + @Override public Mono writeAndFlushWith(final Publisher> body) { return writeWith(Flux.from(body).flatMap(Function.identity())); diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java index 35e610f90f..bac8740c65 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java @@ -18,6 +18,7 @@ package org.springframework.http.client.reactive; import java.net.HttpCookie; import java.net.URI; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.function.Function; @@ -109,15 +110,17 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest { return contentType != null ? contentType.toString() : MediaType.APPLICATION_OCTET_STREAM_VALUE; } - private ContentChunk toContentChunk(DataBuffer buffer, MonoSink sink) { - return new ContentChunk(buffer.toByteBuffer(), new Callback() { + private ContentChunk toContentChunk(DataBuffer dataBuffer, MonoSink sink) { + ByteBuffer byteBuffer = ByteBuffer.allocate(dataBuffer.readableByteCount()); + dataBuffer.toByteBuffer(byteBuffer); + return new ContentChunk(byteBuffer, new Callback() { @Override public void succeeded() { - DataBufferUtils.release(buffer); + DataBufferUtils.release(dataBuffer); } @Override public void failed(Throwable t) { - DataBufferUtils.release(buffer); + DataBufferUtils.release(dataBuffer); sink.error(t); } }); diff --git a/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java b/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java index 2a8672ca71..75f3ed8271 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java +++ b/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ package org.springframework.http.codec.json; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.function.Function; @@ -93,8 +92,11 @@ final class Jackson2Tokenizer { try { int bufferSize = dataBuffer.readableByteCount(); if (this.inputFeeder instanceof ByteBufferFeeder byteBufferFeeder) { - ByteBuffer byteBuffer = dataBuffer.toByteBuffer(); - byteBufferFeeder.feedInput(byteBuffer); + try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) { + while (iterator.hasNext()) { + byteBufferFeeder.feedInput(iterator.next()); + } + } } else if (this.inputFeeder instanceof ByteArrayFeeder byteArrayFeeder) { byte[] bytes = new byte[bufferSize]; diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/PartGenerator.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/PartGenerator.java index 2acc1c49db..c00189e068 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/PartGenerator.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/PartGenerator.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -747,9 +747,13 @@ final class PartGenerator extends BaseSubscriber { @SuppressWarnings("BlockingMethodInNonBlockingContext") private Mono writeInternal(DataBuffer dataBuffer) { try { - ByteBuffer byteBuffer = dataBuffer.toByteBuffer(); - while (byteBuffer.hasRemaining()) { - this.channel.write(byteBuffer); + try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) { + while (iterator.hasNext()) { + ByteBuffer byteBuffer = iterator.next(); + while (byteBuffer.hasRemaining()) { + this.channel.write(byteBuffer); + } + } } return Mono.empty(); } diff --git a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java index d22819dbee..c48a4f00b9 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -151,8 +151,9 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder { public List apply(DataBuffer dataBuffer) { try { increaseByteCount(dataBuffer); - this.streamReader.getInputFeeder().feedInput(dataBuffer.toByteBuffer()); + AsyncByteBufferFeeder inputFeeder = this.streamReader.getInputFeeder(); + try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) { + while (iterator.hasNext()) { + inputFeeder.feedInput(iterator.next()); + } + } List events = new ArrayList<>(); while (true) { if (this.streamReader.next() == AsyncXMLStreamReader.EVENT_INCOMPLETE) { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java index 694bf286f8..e43f7bb24a 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ package org.springframework.http.server.reactive; import java.io.IOException; -import java.io.OutputStream; import java.net.URISyntaxException; import java.nio.ByteBuffer; @@ -158,11 +157,15 @@ public class JettyHttpHandlerAdapter extends ServletHttpHandlerAdapter { @Override protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException { - OutputStream output = getOutputStream(); - if (output instanceof HttpOutput httpOutput) { - ByteBuffer input = dataBuffer.toByteBuffer(); - int len = input.remaining(); - httpOutput.write(input); + if (getOutputStream() instanceof HttpOutput httpOutput) { + int len = 0; + try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) { + while (iterator.hasNext() && httpOutput.isReady()) { + ByteBuffer byteBuffer = iterator.next(); + len += byteBuffer.remaining(); + httpOutput.write(byteBuffer); + } + } return len; } return super.writeToOutputStream(dataBuffer); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/TomcatHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/TomcatHttpHandlerAdapter.java index f302eb5f44..d93b4b8900 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/TomcatHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/TomcatHttpHandlerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ import org.apache.coyote.Response; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.HttpHeaders; import org.springframework.util.Assert; import org.springframework.util.MultiValueMap; @@ -125,26 +126,38 @@ public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter { @Override protected DataBuffer readFromInputStream() throws IOException { - if (!(getInputStream() instanceof CoyoteInputStream coyoteInputStream)) { + if (getInputStream() instanceof CoyoteInputStream coyoteInputStream) { + DataBuffer dataBuffer = this.factory.allocateBuffer(this.bufferSize); + int read = -1; + try { + try (DataBuffer.ByteBufferIterator iterator = dataBuffer.writableByteBuffers()) { + Assert.state(iterator.hasNext(), "No ByteBuffer available"); + ByteBuffer byteBuffer = iterator.next(); + read = coyoteInputStream.read(byteBuffer); + } + logBytesRead(read); + if (read > 0) { + dataBuffer.writePosition(read); + return dataBuffer; + } + else if (read == -1) { + return EOF_BUFFER; + } + else { + return AbstractListenerReadPublisher.EMPTY_BUFFER; + } + } + finally { + if (read <= 0) { + DataBufferUtils.release(dataBuffer); + } + } + } + else { // It's possible InputStream can be wrapped, preventing use of CoyoteInputStream return super.readFromInputStream(); } - ByteBuffer byteBuffer = this.factory.isDirect() ? - ByteBuffer.allocateDirect(this.bufferSize) : - ByteBuffer.allocate(this.bufferSize); - - int read = coyoteInputStream.read(byteBuffer); - logBytesRead(read); - if (read > 0) { - return this.factory.wrap(byteBuffer); - } - else if (read == -1) { - return EOF_BUFFER; - } - else { - return AbstractListenerReadPublisher.EMPTY_BUFFER; - } } } @@ -197,14 +210,20 @@ public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter { @Override protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException { - if (!(getOutputStream() instanceof CoyoteOutputStream coyoteOutputStream)) { + if (getOutputStream() instanceof CoyoteOutputStream coyoteOutputStream) { + int len = 0; + try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) { + while (iterator.hasNext() && coyoteOutputStream.isReady()) { + ByteBuffer byteBuffer = iterator.next(); + len += byteBuffer.remaining(); + coyoteOutputStream.write(byteBuffer); + } + } + return len; + } + else { return super.writeToOutputStream(dataBuffer); } - - ByteBuffer input = dataBuffer.toByteBuffer(); - int len = input.remaining(); - coyoteOutputStream.write(input); - return len; } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index bf6420c436..78272ab3f8 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -230,7 +230,9 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl @Override protected void dataReceived(DataBuffer dataBuffer) { super.dataReceived(dataBuffer); - this.byteBuffer = dataBuffer.toByteBuffer(); + ByteBuffer byteBuffer = ByteBuffer.allocate(dataBuffer.readableByteCount()); + dataBuffer.toByteBuffer(byteBuffer); + this.byteBuffer = byteBuffer; } @Override diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java index e907e7ca7a..cfcb025399 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -68,9 +68,9 @@ class ServerHttpResponseTests { assertThat(response.cookiesWritten).isTrue(); assertThat(response.body).hasSize(3); - assertThat(new String(response.body.get(0).toByteBuffer().array(), StandardCharsets.UTF_8)).isEqualTo("a"); - assertThat(new String(response.body.get(1).toByteBuffer().array(), StandardCharsets.UTF_8)).isEqualTo("b"); - assertThat(new String(response.body.get(2).toByteBuffer().array(), StandardCharsets.UTF_8)).isEqualTo("c"); + assertThat(response.body.get(0).toString(StandardCharsets.UTF_8)).isEqualTo("a"); + assertThat(response.body.get(1).toString(StandardCharsets.UTF_8)).isEqualTo("b"); + assertThat(response.body.get(2).toString(StandardCharsets.UTF_8)).isEqualTo("c"); } @Test // SPR-14952 @@ -84,7 +84,7 @@ class ServerHttpResponseTests { assertThat(response.cookiesWritten).isTrue(); assertThat(response.body).hasSize(1); - assertThat(new String(response.body.get(0).toByteBuffer().array(), StandardCharsets.UTF_8)).isEqualTo("foo"); + assertThat(response.body.get(0).toString(StandardCharsets.UTF_8)).isEqualTo("foo"); } @Test @@ -139,9 +139,9 @@ class ServerHttpResponseTests { assertThat(response.getCookies().getFirst("ID")).isSameAs(cookie); assertThat(response.body).hasSize(3); - assertThat(new String(response.body.get(0).toByteBuffer().array(), StandardCharsets.UTF_8)).isEqualTo("a"); - assertThat(new String(response.body.get(1).toByteBuffer().array(), StandardCharsets.UTF_8)).isEqualTo("b"); - assertThat(new String(response.body.get(2).toByteBuffer().array(), StandardCharsets.UTF_8)).isEqualTo("c"); + assertThat(response.body.get(0).toString(StandardCharsets.UTF_8)).isEqualTo("a"); + assertThat(response.body.get(1).toString(StandardCharsets.UTF_8)).isEqualTo("b"); + assertThat(response.body.get(2).toString(StandardCharsets.UTF_8)).isEqualTo("c"); } @Test diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java b/spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java index 0e0e36c433..2ed0f123bc 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ package org.springframework.web.reactive.resource; import java.io.StringWriter; -import java.nio.CharBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -88,9 +87,8 @@ public class CssLinkResourceTransformer extends ResourceTransformerSupport { .read(outputResource, bufferFactory, StreamUtils.BUFFER_SIZE); return DataBufferUtils.join(flux) .flatMap(dataBuffer -> { - CharBuffer charBuffer = DEFAULT_CHARSET.decode(dataBuffer.asByteBuffer()); + String cssContent = dataBuffer.toString(DEFAULT_CHARSET); DataBufferUtils.release(dataBuffer); - String cssContent = charBuffer.toString(); return transformContent(cssContent, outputResource, transformerChain, exchange); }); }); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java index ae303f2907..78412ddc55 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,12 +20,14 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.SuspendToken; import org.eclipse.jetty.websocket.api.WriteCallback; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -84,24 +86,28 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession remote.sendBytes(byteBuffer, new SendProcessorCallback()); + case PING -> remote.sendPing(byteBuffer); + case PONG -> remote.sendPong(byteBuffer); + default -> throw new IllegalArgumentException("Unexpected message type: " + message.getType()); + } + } + } } return true; } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java index e93dbf5fb8..9849ab587e 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,12 +22,14 @@ import java.nio.charset.StandardCharsets; import jakarta.websocket.CloseReason; import jakarta.websocket.CloseReason.CloseCodes; +import jakarta.websocket.RemoteEndpoint; import jakarta.websocket.SendHandler; import jakarta.websocket.SendResult; import jakarta.websocket.Session; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.web.reactive.socket.CloseStatus; @@ -73,24 +75,28 @@ public class StandardWebSocketSession extends AbstractListenerWebSocketSession remote.sendBinary(byteBuffer, new SendProcessorCallback()); + case PING -> remote.sendPing(byteBuffer); + case PONG -> remote.sendPong(byteBuffer); + default -> throw new IllegalArgumentException("Unexpected message type: " + message.getType()); + } + } + } } return true; } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java index 9945be92dd..b80d4c1e3b 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -76,26 +76,26 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession WebSockets.sendBinary(byteBuffer, channel, new SendProcessorCallback(dataBuffer)); + case PING -> WebSockets.sendPing(byteBuffer, channel, new SendProcessorCallback(dataBuffer)); + case PONG -> WebSockets.sendPong(byteBuffer, channel, new SendProcessorCallback(dataBuffer)); + default -> throw new IllegalArgumentException("Unexpected message type: " + message.getType()); + } + } + } } return true; }