From 9c33d2707a8767bfe3466ce12f5a70a43ddbd15a Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Wed, 27 Jul 2022 15:24:14 +0200 Subject: [PATCH] Introduce support for Netty 5 Buffer This commit introduces support for Netty 5's Buffer, in the form of Netty5DataBuffer. Because of the new API offered by Buffer, several changes have been made to the DataBuffer API: - CloseableDataBuffer is a simpler alternative to PooledDataBuffer, and implemented by Netty5DataBuffer. DataBufferUtils::release can now handle CloseableDataBuffer as well as PooledDataBuffer. - PooledDataBuffer::touch has been moved into a separate interface: TouchableDataBuffer, which is implemented by Netty5DataBuffer. - The capacity of DataBuffers can no longer be reduced, they can only grow larger. As a consequence, DataBuffer::capacity(int) has been deprecated, but ensureWritable (formally ensureCapacity) still exists. - DataBuffer::slice and retainedSlice have been deprecated in favor of split, a new method that ensures that memory regions do not overlap. - DataBuffer::asByteBuffer has been deprecated in favor of toByteBuffer, a new method that returns a copy, instead of shared data. - DataBufferFactory::allocateBuffer has been deprecated in favor of allocateBuffer(int). Closes gh-28874 --- spring-core/spring-core.gradle | 1 + .../codec/AbstractSingleValueEncoder.java | 5 +- .../core/codec/ByteBufferDecoder.java | 13 +- .../core/codec/DataBufferDecoder.java | 4 +- .../core/codec/Netty5BufferDecoder.java | 68 ++++ .../core/codec/Netty5BufferEncoder.java | 77 ++++ .../core/codec/StringDecoder.java | 23 +- .../core/io/buffer/CloseableDataBuffer.java | 35 ++ .../core/io/buffer/DataBuffer.java | 83 ++++- .../core/io/buffer/DataBufferFactory.java | 14 +- .../core/io/buffer/DataBufferInputStream.java | 111 ++++++ .../io/buffer/DataBufferOutputStream.java | 73 ++++ .../core/io/buffer/DataBufferUtils.java | 89 +++-- .../core/io/buffer/DataBufferWrapper.java | 28 +- .../core/io/buffer/DefaultDataBuffer.java | 127 +++---- .../io/buffer/DefaultDataBufferFactory.java | 7 +- .../core/io/buffer/Netty5DataBuffer.java | 346 ++++++++++++++++++ .../io/buffer/Netty5DataBufferFactory.java | 139 +++++++ .../core/io/buffer/NettyDataBuffer.java | 52 ++- .../io/buffer/NettyDataBufferFactory.java | 10 +- .../core/io/buffer/PooledDataBuffer.java | 7 +- .../core/io/buffer/TouchableDataBuffer.java | 34 ++ .../core/codec/Netty5BufferDecoderTests.java | 93 +++++ .../core/codec/Netty5BufferEncoderTests.java | 70 ++++ .../core/io/buffer/DataBufferTests.java | 134 ++++++- .../core/io/buffer/DataBufferUtilsTests.java | 14 +- .../AbstractDataBufferAllocatingTests.java | 26 +- .../io/buffer/LeakAwareDataBufferFactory.java | 8 +- .../messaging/rsocket/PayloadUtils.java | 7 +- .../web/reactive/server/WiretapConnector.java | 4 +- spring-web/spring-web.gradle | 1 + .../HttpComponentsClientHttpRequest.java | 2 +- .../client/reactive/JdkClientHttpRequest.java | 6 +- .../reactive/JettyClientHttpRequest.java | 5 +- .../http/codec/EncoderHttpMessageWriter.java | 9 +- .../http/codec/FormHttpMessageReader.java | 4 +- .../ServerSentEventHttpMessageWriter.java | 5 +- .../multipart/MultipartHttpMessageWriter.java | 5 +- .../http/codec/multipart/MultipartParser.java | 30 +- .../http/codec/multipart/MultipartUtils.java | 18 - .../multipart/MultipartWriterSupport.java | 25 +- .../multipart/PartEventHttpMessageWriter.java | 3 +- .../http/codec/multipart/PartGenerator.java | 2 +- .../multipart/PartHttpMessageWriter.java | 5 +- .../http/codec/protobuf/ProtobufDecoder.java | 6 +- .../http/codec/protobuf/ProtobufEncoder.java | 20 +- .../http/codec/support/BaseDefaultCodecs.java | 13 +- .../http/codec/xml/XmlEventDecoder.java | 4 +- .../reactive/AbstractServerHttpResponse.java | 7 +- .../reactive/JettyHttpHandlerAdapter.java | 4 +- .../reactive/TomcatHttpHandlerAdapter.java | 40 +- .../reactive/UndertowServerHttpResponse.java | 4 +- .../support/ClientCodecConfigurerTests.java | 19 +- .../codec/support/CodecConfigurerTests.java | 16 +- .../support/ServerCodecConfigurerTests.java | 11 +- spring-webflux/spring-webflux.gradle | 1 + .../view/freemarker/FreeMarkerView.java | 19 +- .../socket/adapter/JettyWebSocketSession.java | 2 +- .../adapter/StandardWebSocketSession.java | 2 +- .../UndertowWebSocketHandlerAdapter.java | 19 +- .../adapter/UndertowWebSocketSession.java | 2 +- .../DelegatingWebFluxConfigurationTests.java | 4 +- .../WebFluxConfigurationSupportTests.java | 6 +- .../asciidoc/core/core-databuffer-codec.adoc | 2 +- 64 files changed, 1663 insertions(+), 360 deletions(-) create mode 100644 spring-core/src/main/java/org/springframework/core/codec/Netty5BufferDecoder.java create mode 100644 spring-core/src/main/java/org/springframework/core/codec/Netty5BufferEncoder.java create mode 100644 spring-core/src/main/java/org/springframework/core/io/buffer/CloseableDataBuffer.java create mode 100644 spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferInputStream.java create mode 100644 spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferOutputStream.java create mode 100644 spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBuffer.java create mode 100644 spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBufferFactory.java create mode 100644 spring-core/src/main/java/org/springframework/core/io/buffer/TouchableDataBuffer.java create mode 100644 spring-core/src/test/java/org/springframework/core/codec/Netty5BufferDecoderTests.java create mode 100644 spring-core/src/test/java/org/springframework/core/codec/Netty5BufferEncoderTests.java diff --git a/spring-core/spring-core.gradle b/spring-core/spring-core.gradle index 6546aa7e36..49200e16c7 100644 --- a/spring-core/spring-core.gradle +++ b/spring-core/spring-core.gradle @@ -72,6 +72,7 @@ dependencies { optional("io.reactivex.rxjava3:rxjava") optional("io.smallrye.reactive:mutiny") optional("io.netty:netty-buffer") + optional("io.netty:netty5-buffer:5.0.0.Alpha4") testImplementation("jakarta.annotation:jakarta.annotation-api") testImplementation("jakarta.xml.bind:jakarta.xml.bind-api") testImplementation("com.google.code.findbugs:jsr305") diff --git a/spring-core/src/main/java/org/springframework/core/codec/AbstractSingleValueEncoder.java b/spring-core/src/main/java/org/springframework/core/codec/AbstractSingleValueEncoder.java index cdaba36bcc..719b6f2773 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/AbstractSingleValueEncoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/AbstractSingleValueEncoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,6 @@ import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.lang.Nullable; import org.springframework.util.MimeType; @@ -52,7 +51,7 @@ public abstract class AbstractSingleValueEncoder extends AbstractEncoder { return Flux.from(inputStream) .take(1) .concatMap(value -> encode(value, bufferFactory, elementType, mimeType, hints)) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release); } /** diff --git a/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java index 9c1133fb1a..6481ef86db 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,15 +51,12 @@ public class ByteBufferDecoder extends AbstractDataBufferDecoder { public ByteBuffer decode(DataBuffer dataBuffer, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - int byteCount = dataBuffer.readableByteCount(); - ByteBuffer copy = ByteBuffer.allocate(byteCount); - copy.put(dataBuffer.asByteBuffer()); - copy.flip(); - DataBufferUtils.release(dataBuffer); + ByteBuffer result = dataBuffer.toByteBuffer(); if (logger.isDebugEnabled()) { - logger.debug(Hints.getLogPrefix(hints) + "Read " + byteCount + " bytes"); + logger.debug(Hints.getLogPrefix(hints) + "Read " + dataBuffer.readableByteCount() + " bytes"); } - return copy; + DataBufferUtils.release(dataBuffer); + return result; } } diff --git a/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java index 34b150378e..788eafdb0e 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,7 +35,7 @@ import org.springframework.util.MimeTypeUtils; * after they have been consumed. In addition, if using {@code Flux} or * {@code Mono} operators such as flatMap, reduce, and others that prefetch, * cache, and skip or filter out data items internally, please add - * {@code doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)} to the + * {@code doOnDiscard(DataBuffer.class, DataBufferUtils::release)} to the * composition chain to ensure cached data buffers are released prior to an * error or cancellation signal. * diff --git a/spring-core/src/main/java/org/springframework/core/codec/Netty5BufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/Netty5BufferDecoder.java new file mode 100644 index 0000000000..b6ebb93d8a --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/codec/Netty5BufferDecoder.java @@ -0,0 +1,68 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec; + +import java.util.Map; + +import io.netty5.buffer.api.Buffer; +import io.netty5.buffer.api.DefaultBufferAllocators; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.Netty5DataBuffer; +import org.springframework.lang.Nullable; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; + +/** + * Decoder for {@link Buffer Buffers}. + * + * @author Violeta Georgieva + * @since 6.0 + */ +public class Netty5BufferDecoder extends AbstractDataBufferDecoder { + + public Netty5BufferDecoder() { + super(MimeTypeUtils.ALL); + } + + + @Override + public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) { + return (Buffer.class.isAssignableFrom(elementType.toClass()) && + super.canDecode(elementType, mimeType)); + } + + @Override + public Buffer decode(DataBuffer dataBuffer, ResolvableType elementType, + @Nullable MimeType mimeType, @Nullable Map hints) { + + if (logger.isDebugEnabled()) { + logger.debug(Hints.getLogPrefix(hints) + "Read " + dataBuffer.readableByteCount() + " bytes"); + } + if (dataBuffer instanceof Netty5DataBuffer netty5DataBuffer) { + return netty5DataBuffer.getNativeBuffer(); + } + byte[] bytes = new byte[dataBuffer.readableByteCount()]; + dataBuffer.read(bytes); + Buffer buffer = DefaultBufferAllocators.preferredAllocator().copyOf(bytes); + DataBufferUtils.release(dataBuffer); + return buffer; + } + +} diff --git a/spring-core/src/main/java/org/springframework/core/codec/Netty5BufferEncoder.java b/spring-core/src/main/java/org/springframework/core/codec/Netty5BufferEncoder.java new file mode 100644 index 0000000000..465a924a11 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/codec/Netty5BufferEncoder.java @@ -0,0 +1,77 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec; + +import java.util.Map; + +import io.netty5.buffer.api.Buffer; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.Netty5DataBufferFactory; +import org.springframework.lang.Nullable; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; + +/** + * Encoder for {@link Buffer Buffers}. + * + * @author Violeta Georgieva + * @since 6.0 + */ +public class Netty5BufferEncoder extends AbstractEncoder { + + public Netty5BufferEncoder() { + super(MimeTypeUtils.ALL); + } + + + @Override + public boolean canEncode(ResolvableType type, @Nullable MimeType mimeType) { + Class clazz = type.toClass(); + return super.canEncode(type, mimeType) && Buffer.class.isAssignableFrom(clazz); + } + + @Override + public Flux encode(Publisher inputStream, + DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType, + @Nullable Map hints) { + + return Flux.from(inputStream).map(byteBuffer -> + encodeValue(byteBuffer, bufferFactory, elementType, mimeType, hints)); + } + + @Override + public DataBuffer encodeValue(Buffer buffer, DataBufferFactory bufferFactory, ResolvableType valueType, + @Nullable MimeType mimeType, @Nullable Map hints) { + + if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) { + String logPrefix = Hints.getLogPrefix(hints); + logger.debug(logPrefix + "Writing " + buffer.readableBytes() + " bytes"); + } + if (bufferFactory instanceof Netty5DataBufferFactory netty5DataBufferFactory) { + return netty5DataBufferFactory.wrap(buffer); + } + byte[] bytes = new byte[buffer.readableBytes()]; + buffer.readBytes(bytes, 0, bytes.length); + buffer.close(); + return bufferFactory.wrap(bytes); + } +} diff --git a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java index cb74b8d2c4..15e6325f72 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java @@ -35,7 +35,6 @@ import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.LimitedDataBufferList; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.core.log.LogFormatUtils; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -127,7 +126,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder { return Mono.just(lastBuffer); })) .doOnTerminate(chunks::releaseAndClear) - .doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release) + .doOnDiscard(DataBuffer.class, DataBufferUtils::release) .map(buffer -> decode(buffer, elementType, mimeType, hints)); } @@ -153,26 +152,26 @@ public final class StringDecoder extends AbstractDataBufferDecoder { DataBufferUtils.retain(buffer); // retain after add (may raise DataBufferLimitException) break; } - int startIndex = buffer.readPosition(); - int length = (endIndex - startIndex + 1); - DataBuffer slice = buffer.retainedSlice(startIndex, length); - result = (result != null ? result : new ArrayList<>()); + DataBuffer split = buffer.split(endIndex + 1); + if (result == null) { + result = new ArrayList<>(); + } + int delimiterLength = matcher.delimiter().length; if (chunks.isEmpty()) { if (this.stripDelimiter) { - slice.writePosition(slice.writePosition() - matcher.delimiter().length); + split.writePosition(split.writePosition() - delimiterLength); } - result.add(slice); + result.add(split); } else { - chunks.add(slice); + chunks.add(split); DataBuffer joined = buffer.factory().join(chunks); if (this.stripDelimiter) { - joined.writePosition(joined.writePosition() - matcher.delimiter().length); + joined.writePosition(joined.writePosition() - delimiterLength); } result.add(joined); chunks.clear(); } - buffer.readPosition(endIndex + 1); } while (buffer.readableByteCount() > 0); return (result != null ? result : Collections.emptyList()); @@ -187,7 +186,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder { @Nullable MimeType mimeType, @Nullable Map hints) { Charset charset = getCharset(mimeType); - CharBuffer charBuffer = charset.decode(dataBuffer.asByteBuffer()); + CharBuffer charBuffer = charset.decode(dataBuffer.toByteBuffer()); DataBufferUtils.release(dataBuffer); String value = charBuffer.toString(); LogFormatUtils.traceDebug(logger, traceOn -> { diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/CloseableDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/CloseableDataBuffer.java new file mode 100644 index 0000000000..1a1707ac6d --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/CloseableDataBuffer.java @@ -0,0 +1,35 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.buffer; + +/** + * Extension of {@link DataBuffer} that allows for buffers that can be used + * in a {@code try}-with-resources statement. + + * @author Arjen Poutsma + * @since 6.0 + */ +public interface CloseableDataBuffer extends DataBuffer, AutoCloseable { + + /** + * Closes this data buffer, freeing any resources. + * @throws IllegalStateException if this buffer has already been closed + */ + @Override + void close(); + +} diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java index 6f5a38bded..ba9ddb5e5c 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -111,7 +111,10 @@ public interface DataBuffer { * the current capacity, it will be expanded. * @param capacity the new capacity * @return this buffer + * @deprecated as of 6.0, in favor of {@link #ensureWritable(int)}, which + * has different semantics */ + @Deprecated DataBuffer capacity(int capacity); /** @@ -121,11 +124,23 @@ public interface DataBuffer { * @param capacity the writable capacity to check for * @return this buffer * @since 5.1.4 + * @deprecated since 6.0, in favor of {@link #ensureWritable(int)} */ + @Deprecated default DataBuffer ensureCapacity(int capacity) { - return this; + return ensureWritable(capacity); } + /** + * Ensure that the current buffer has enough {@link #writableByteCount()} + * to write the amount of data given as an argument. If not, the missing + * capacity will be added to the buffer. + * @param capacity the writable capacity to check for + * @return this buffer + * @since 6.0 + */ + DataBuffer ensureWritable(int capacity); + /** * Return the position from which this buffer will read. * @return the read position @@ -286,7 +301,10 @@ public interface DataBuffer { * @param index the index at which to start the slice * @param length the length of the slice * @return the specified slice of this data buffer + * @deprecated as of 6.0, in favor of {@link #split(int)}, which + * has different semantics */ + @Deprecated DataBuffer slice(int index, int length); /** @@ -301,18 +319,44 @@ public interface DataBuffer { * @param length the length of the slice * @return the specified, retained slice of this data buffer * @since 5.2 + * @deprecated as of 6.0, in favor of {@link #split(int)}, which + * has different semantics */ + @Deprecated default DataBuffer retainedSlice(int index, int length) { return DataBufferUtils.retain(slice(index, length)); } + /** + * Splits this data buffer into two at the given index. + * + *

Data that precedes the {@code index} will be returned in a new buffer, + * while this buffer will contain data that follows after {@code index}. + * Memory between the two buffers is shared, but independent and cannot + * overlap (unlike {@link #slice(int, int) slice}). + * + *

The {@linkplain #readPosition() read} and + * {@linkplain #writePosition() write} position of the returned buffer are + * truncated to fit within the buffers {@linkplain #capacity() capacity} if + * necessary. The positions of this buffer are set to {@code 0} if they are + * smaller than {@code index}. + * @param index the index at which it should be split + * @return a new data buffer, containing the bytes from index {@code 0} to + * {@code index} + * @since 6.0 + */ + DataBuffer split(int index); + /** * Expose this buffer's bytes as a {@link ByteBuffer}. Data between this * {@code DataBuffer} and the returned {@code ByteBuffer} is shared; though * changes in the returned buffer's {@linkplain ByteBuffer#position() position} * will not be reflected in the reading nor writing position of this data buffer. * @return this data buffer as a byte buffer + * @deprecated as of 6.0, in favor of {@link #toByteBuffer()}, which does + * not share data and returns a copy. */ + @Deprecated ByteBuffer asByteBuffer(); /** @@ -324,9 +368,32 @@ public interface DataBuffer { * @param length the length of the returned byte buffer * @return this data buffer as a byte buffer * @since 5.0.1 + * @deprecated as of 6.0, in favor of {@link #toByteBuffer(int, int)}, which + * does not share data and returns a copy. */ + @Deprecated ByteBuffer asByteBuffer(int index, int length); + /** + * Returns a {@link ByteBuffer} representation of this data buffer. Data + * between this {@code DataBuffer} and the returned {@code ByteBuffer} is + * not shared. + * @return this data buffer as a byte buffer + * @since 6.0 + */ + default ByteBuffer toByteBuffer() { + return toByteBuffer(readPosition(), readableByteCount()); + } + + /** + * Returns a {@link ByteBuffer} representation of a subsequence of this + * buffer's bytes. Data between this {@code DataBuffer} and the returned + * {@code ByteBuffer} is not shared. + * @return this data buffer as a byte buffer + * @since 6.0 + */ + ByteBuffer toByteBuffer(int index, int length); + /** * Expose this buffer's data as an {@link InputStream}. Both data and read position are * shared between the returned stream and this data buffer. The underlying buffer will @@ -335,7 +402,9 @@ public interface DataBuffer { * @return this data buffer as an input stream * @see #asInputStream(boolean) */ - InputStream asInputStream(); + default InputStream asInputStream() { + return new DataBufferInputStream(this, false); + } /** * Expose this buffer's data as an {@link InputStream}. Both data and read position are @@ -346,14 +415,18 @@ public interface DataBuffer { * @return this data buffer as an input stream * @since 5.0.4 */ - InputStream asInputStream(boolean releaseOnClose); + default InputStream asInputStream(boolean releaseOnClose) { + return new DataBufferInputStream(this, releaseOnClose); + }; /** * Expose this buffer's data as an {@link OutputStream}. Both data and write position are * shared between the returned stream and this data buffer. * @return this data buffer as an output stream */ - OutputStream asOutputStream(); + default OutputStream asOutputStream() { + return new DataBufferOutputStream(this); + } /** * Return this buffer's data a String using the specified charset. Default implementation diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferFactory.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferFactory.java index baa8e9ce5e..865c173739 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferFactory.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,9 @@ public interface DataBufferFactory { * underlying implementation and its configuration, this will be heap-based * or direct buffer. * @return the allocated buffer + * @deprecated as of 6.0, in favor of {@link #allocateBuffer(int)} */ + @Deprecated DataBuffer allocateBuffer(); /** @@ -75,4 +77,14 @@ public interface DataBufferFactory { */ DataBuffer join(List dataBuffers); + /** + * Indicates whether this factory allocates direct buffers (i.e. non-heap, + * native memory). + * @return {@code true} if this factory allocates direct buffers; + * {@code false} otherwise + * @since 6.0 + */ + boolean isDirect(); + + } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferInputStream.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferInputStream.java new file mode 100644 index 0000000000..e8a1b179a7 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferInputStream.java @@ -0,0 +1,111 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.buffer; + +import java.io.IOException; +import java.io.InputStream; + +import org.springframework.util.Assert; + +/** + * An {@link InputStream} that reads from a {@link DataBuffer}. + * + * @author Arjen Poutsma + * @since 6.0 + * @see DataBuffer#asInputStream(boolean) + */ +final class DataBufferInputStream extends InputStream { + + private final DataBuffer dataBuffer; + + private final int end; + + private final boolean releaseOnClose; + + private boolean closed; + + private int mark; + + + public DataBufferInputStream(DataBuffer dataBuffer, boolean releaseOnClose) { + Assert.notNull(dataBuffer, "DataBuffer must not be null"); + this.dataBuffer = dataBuffer; + int start = this.dataBuffer.readPosition(); + this.end = start + this.dataBuffer.readableByteCount(); + this.mark = start; + this.releaseOnClose = releaseOnClose; + } + + @Override + public int read() throws IOException { + checkClosed(); + if (available() == 0) { + return -1; + } + return this.dataBuffer.read() & 0xFF; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + checkClosed(); + int available = available(); + if (available == 0) { + return -1; + } + len = Math.min(available, len); + this.dataBuffer.read(b, off, len); + return len; + } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public void mark(int mark) { + this.mark = mark; + } + + @Override + public int available() { + return Math.max(0, this.end - this.dataBuffer.readPosition()); + } + + @Override + public void reset() { + this.dataBuffer.readPosition(this.mark); + } + + @Override + public void close() { + if (this.closed) { + return; + } + if (this.releaseOnClose) { + DataBufferUtils.release(this.dataBuffer); + } + this.closed = true; + } + + private void checkClosed() throws IOException { + if (this.closed) { + throw new IOException("DataBufferInputStream is closed"); + } + } + +} diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferOutputStream.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferOutputStream.java new file mode 100644 index 0000000000..e6b6596cfe --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferOutputStream.java @@ -0,0 +1,73 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.buffer; + +import java.io.IOException; +import java.io.OutputStream; + +import org.springframework.util.Assert; + +/** + * An {@link OutputStream} that writes to a {@link DataBuffer}. + * + * @author Arjen Poutsma + * @since 6.0 + * @see DataBuffer#asOutputStream() + */ +final class DataBufferOutputStream extends OutputStream { + + private final DataBuffer dataBuffer; + + private boolean closed; + + + public DataBufferOutputStream(DataBuffer dataBuffer) { + Assert.notNull(dataBuffer, "DataBuffer must not be null"); + this.dataBuffer = dataBuffer; + } + + @Override + public void write(int b) throws IOException { + checkClosed(); + this.dataBuffer.ensureWritable(1); + this.dataBuffer.write((byte) b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + checkClosed(); + if (len > 0) { + this.dataBuffer.ensureWritable(len); + this.dataBuffer.write(b, off, len); + } + } + + @Override + public void close() { + if (this.closed) { + return; + } + this.closed = true; + } + + private void checkClosed() throws IOException { + if (this.closed) { + throw new IOException("DataBufferOutputStream is closed"); + } + } + +} diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 03811089b8..52f8bae918 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -156,7 +156,7 @@ public abstract class DataBufferUtils { // and then complete after releasing the DataBuffer. }); - return flux.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + return flux.doOnDiscard(DataBuffer.class, DataBufferUtils::release); } /** @@ -417,7 +417,8 @@ public abstract class DataBufferUtils { * @param maxByteCount the maximum byte count * @return a flux whose maximum byte count is {@code maxByteCount} */ - public static Flux takeUntilByteCount(Publisher publisher, long maxByteCount) { + @SuppressWarnings("unchecked") + public static Flux takeUntilByteCount(Publisher publisher, long maxByteCount) { Assert.notNull(publisher, "Publisher must not be null"); Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number"); @@ -427,8 +428,10 @@ public abstract class DataBufferUtils { .map(buffer -> { long remainder = countDown.addAndGet(-buffer.readableByteCount()); if (remainder < 0) { - int length = buffer.readableByteCount() + (int) remainder; - return buffer.slice(0, length); + int index = buffer.readableByteCount() + (int) remainder; + DataBuffer split = buffer.split(index); + release(buffer); + return (T)split; } else { return buffer; @@ -448,7 +451,7 @@ public abstract class DataBufferUtils { * @param maxByteCount the maximum byte count * @return a flux with the remaining part of the given publisher */ - public static Flux skipUntilByteCount(Publisher publisher, long maxByteCount) { + public static Flux skipUntilByteCount(Publisher publisher, long maxByteCount) { Assert.notNull(publisher, "Publisher must not be null"); Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number"); @@ -464,14 +467,15 @@ public abstract class DataBufferUtils { if (remainder < 0) { countDown.set(0); int start = buffer.readableByteCount() + (int)remainder; - int length = (int) -remainder; - return buffer.slice(start, length); + DataBuffer split = buffer.split(start); + release(split); + return buffer; } else { return buffer; } }); - }).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + }).doOnDiscard(DataBuffer.class, DataBufferUtils::release); } /** @@ -499,8 +503,8 @@ public abstract class DataBufferUtils { */ @SuppressWarnings("unchecked") public static T touch(T dataBuffer, Object hint) { - if (dataBuffer instanceof PooledDataBuffer pooledDataBuffer) { - return (T) pooledDataBuffer.touch(hint); + if (dataBuffer instanceof TouchableDataBuffer touchableDataBuffer) { + return (T) touchableDataBuffer.touch(hint); } else { return dataBuffer; @@ -508,8 +512,11 @@ public abstract class DataBufferUtils { } /** - * Release the given data buffer, if it is a {@link PooledDataBuffer} and - * has been {@linkplain PooledDataBuffer#isAllocated() allocated}. + * Release the given data buffer. If it is a {@link PooledDataBuffer} and + * has been {@linkplain PooledDataBuffer#isAllocated() allocated}, this + * method will call {@link PooledDataBuffer#release()}. If it is a + * {@link CloseableDataBuffer}, this method will call + * {@link CloseableDataBuffer#close()}. * @param dataBuffer the data buffer to release * @return {@code true} if the buffer was released; {@code false} otherwise. */ @@ -520,7 +527,6 @@ public abstract class DataBufferUtils { return pooledDataBuffer.release(); } catch (IllegalStateException ex) { - // Avoid dependency on Netty: IllegalReferenceCountException if (logger.isDebugEnabled()) { logger.debug("Failed to release PooledDataBuffer: " + dataBuffer, ex); } @@ -528,6 +534,19 @@ public abstract class DataBufferUtils { } } } + else if (dataBuffer instanceof CloseableDataBuffer closeableDataBuffer) { + try { + closeableDataBuffer.close(); + return true; + } + catch (IllegalStateException ex) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to release CloseableDataBuffer " + dataBuffer, ex); + } + return false; + + } + } return false; } @@ -581,7 +600,7 @@ public abstract class DataBufferUtils { .collect(() -> new LimitedDataBufferList(maxByteCount), LimitedDataBufferList::add) .filter(list -> !list.isEmpty()) .map(list -> list.get(0).factory().join(list)) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release); } /** @@ -887,14 +906,13 @@ public abstract class DataBufferUtils { @Override public void accept(SynchronousSink sink) { - boolean release = true; - DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); + ByteBuffer byteBuffer = this.dataBufferFactory.isDirect() ? + ByteBuffer.allocateDirect(this.bufferSize) : + ByteBuffer.allocate(this.bufferSize); try { - int read; - ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, dataBuffer.capacity()); - if ((read = this.channel.read(byteBuffer)) >= 0) { - dataBuffer.writePosition(read); - release = false; + if (this.channel.read(byteBuffer) >= 0) { + byteBuffer.flip(); + DataBuffer dataBuffer = this.dataBufferFactory.wrap(byteBuffer); sink.next(dataBuffer); } else { @@ -904,16 +922,11 @@ public abstract class DataBufferUtils { catch (IOException ex) { sink.error(ex); } - finally { - if (release) { - release(dataBuffer); - } - } } } - private static class ReadCompletionHandler implements CompletionHandler { + private static class ReadCompletionHandler implements CompletionHandler { private final AsynchronousFileChannel channel; @@ -965,21 +978,20 @@ public abstract class DataBufferUtils { } private void read() { - DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); - ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize); - this.channel.read(byteBuffer, this.position.get(), dataBuffer, this); + ByteBuffer byteBuffer = this.dataBufferFactory.isDirect() ? + ByteBuffer.allocateDirect(this.bufferSize) : + ByteBuffer.allocate(this.bufferSize); + this.channel.read(byteBuffer, this.position.get(), byteBuffer, this); } @Override - public void completed(Integer read, DataBuffer dataBuffer) { + public void completed(Integer read, ByteBuffer byteBuffer) { if (this.state.get().equals(State.DISPOSED)) { - release(dataBuffer); closeChannel(this.channel); return; } if (read == -1) { - release(dataBuffer); closeChannel(this.channel); this.state.set(State.DISPOSED); this.sink.complete(); @@ -987,7 +999,9 @@ public abstract class DataBufferUtils { } this.position.addAndGet(read); - dataBuffer.writePosition(read); + + byteBuffer.flip(); + DataBuffer dataBuffer = this.dataBufferFactory.wrap(byteBuffer); this.sink.next(dataBuffer); // Stay in READING mode if there is demand @@ -1003,8 +1017,7 @@ public abstract class DataBufferUtils { } @Override - public void failed(Throwable exc, DataBuffer dataBuffer) { - release(dataBuffer); + public void failed(Throwable exc, ByteBuffer byteBuffer) { closeChannel(this.channel); this.state.set(State.DISPOSED); this.sink.error(exc); @@ -1035,7 +1048,7 @@ public abstract class DataBufferUtils { @Override protected void hookOnNext(DataBuffer dataBuffer) { try { - ByteBuffer byteBuffer = dataBuffer.asByteBuffer(); + ByteBuffer byteBuffer = dataBuffer.toByteBuffer(); while (byteBuffer.hasRemaining()) { this.channel.write(byteBuffer); } @@ -1099,7 +1112,7 @@ public abstract class DataBufferUtils { if (!this.dataBuffer.compareAndSet(null, value)) { throw new IllegalStateException(); } - ByteBuffer byteBuffer = value.asByteBuffer(); + ByteBuffer byteBuffer = value.toByteBuffer(); this.channel.write(byteBuffer, this.position.get(), byteBuffer, this); } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferWrapper.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferWrapper.java index d4084ec63b..0ef9c42614 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferWrapper.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferWrapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -85,15 +85,22 @@ public class DataBufferWrapper implements DataBuffer { } @Override + @Deprecated public DataBuffer capacity(int capacity) { return this.delegate.capacity(capacity); } @Override + @Deprecated public DataBuffer ensureCapacity(int capacity) { return this.delegate.ensureCapacity(capacity); } + @Override + public DataBuffer ensureWritable(int capacity) { + return this.delegate.ensureWritable(capacity); + } + @Override public int readPosition() { return this.delegate.readPosition(); @@ -166,25 +173,44 @@ public class DataBufferWrapper implements DataBuffer { } @Override + @Deprecated public DataBuffer slice(int index, int length) { return this.delegate.slice(index, length); } @Override + @Deprecated public DataBuffer retainedSlice(int index, int length) { return this.delegate.retainedSlice(index, length); } @Override + public DataBuffer split(int index) { + return this.delegate.split(index); + } + + @Override + @Deprecated public ByteBuffer asByteBuffer() { return this.delegate.asByteBuffer(); } @Override + @Deprecated public ByteBuffer asByteBuffer(int index, int length) { return this.delegate.asByteBuffer(index, length); } + @Override + public ByteBuffer toByteBuffer() { + return this.delegate.toByteBuffer(); + } + + @Override + public ByteBuffer toByteBuffer(int index, int length) { + return this.delegate.toByteBuffer(index, length); + } + @Override public InputStream asInputStream() { return this.delegate.asInputStream(); diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java index f4c898a8ee..fd146b6846 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,9 +16,6 @@ package org.springframework.core.io.buffer; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Arrays; @@ -179,9 +176,15 @@ public class DefaultDataBuffer implements DataBuffer { } @Override - public DefaultDataBuffer capacity(int newCapacity) { - if (newCapacity <= 0) { - throw new IllegalArgumentException(String.format("'newCapacity' %d must be higher than 0", newCapacity)); + @Deprecated + public DataBuffer capacity(int capacity) { + setCapacity(capacity); + return this; + } + + private void setCapacity(int newCapacity) { + if (newCapacity < 0) { + throw new IllegalArgumentException(String.format("'newCapacity' %d must be 0 or higher", newCapacity)); } int readPosition = readPosition(); int writePosition = writePosition(); @@ -215,14 +218,13 @@ public class DefaultDataBuffer implements DataBuffer { } setNativeBuffer(newBuffer); } - return this; } @Override - public DataBuffer ensureCapacity(int length) { + public DataBuffer ensureWritable(int length) { if (length > writableByteCount()) { int newCapacity = calculateCapacity(this.writePosition + length); - capacity(newCapacity); + setCapacity(newCapacity); } return this; } @@ -273,7 +275,7 @@ public class DefaultDataBuffer implements DataBuffer { @Override public DefaultDataBuffer write(byte b) { - ensureCapacity(1); + ensureWritable(1); int pos = this.writePosition; this.byteBuffer.put(pos, b); this.writePosition = pos + 1; @@ -290,7 +292,7 @@ public class DefaultDataBuffer implements DataBuffer { @Override public DefaultDataBuffer write(byte[] source, int offset, int length) { Assert.notNull(source, "Byte array must not be null"); - ensureCapacity(length); + ensureWritable(length); ByteBuffer tmp = this.byteBuffer.duplicate(); int limit = this.writePosition + length; @@ -304,7 +306,7 @@ public class DefaultDataBuffer implements DataBuffer { @Override public DefaultDataBuffer write(DataBuffer... buffers) { if (!ObjectUtils.isEmpty(buffers)) { - write(Arrays.stream(buffers).map(DataBuffer::asByteBuffer).toArray(ByteBuffer[]::new)); + write(Arrays.stream(buffers).map(DataBuffer::toByteBuffer).toArray(ByteBuffer[]::new)); } return this; } @@ -313,7 +315,7 @@ public class DefaultDataBuffer implements DataBuffer { public DefaultDataBuffer write(ByteBuffer... buffers) { if (!ObjectUtils.isEmpty(buffers)) { int capacity = Arrays.stream(buffers).mapToInt(ByteBuffer::remaining).sum(); - ensureCapacity(capacity); + ensureWritable(capacity); Arrays.stream(buffers).forEach(this::write); } return this; @@ -329,6 +331,7 @@ public class DefaultDataBuffer implements DataBuffer { } @Override + @Deprecated public DefaultDataBuffer slice(int index, int length) { checkIndex(index, length); int oldPosition = this.byteBuffer.position(); @@ -344,11 +347,37 @@ public class DefaultDataBuffer implements DataBuffer { } @Override + public DataBuffer split(int index) { + checkIndex(index); + + ByteBuffer split = this.byteBuffer.duplicate().clear() + .position(0) + .limit(index) + .slice(); + + DefaultDataBuffer result = new DefaultDataBuffer(this.dataBufferFactory, split); + result.writePosition = Math.min(this.writePosition, index); + result.readPosition = Math.min(this.readPosition, index); + + this.byteBuffer = this.byteBuffer.duplicate().clear() + .position(index) + .limit(this.byteBuffer.capacity()) + .slice(); + this.writePosition = Math.max(this.writePosition, index) - index; + this.readPosition = Math.max(this.readPosition, index) - index; + capacity(this.byteBuffer.capacity()); + + return result; + } + + @Override + @Deprecated public ByteBuffer asByteBuffer() { return asByteBuffer(this.readPosition, readableByteCount()); } @Override + @Deprecated public ByteBuffer asByteBuffer(int index, int length) { checkIndex(index, length); @@ -359,21 +388,16 @@ public class DefaultDataBuffer implements DataBuffer { } @Override - public InputStream asInputStream() { - return new DefaultDataBufferInputStream(); - } + public ByteBuffer toByteBuffer(int index, int length) { + checkIndex(index, length); - @Override - public InputStream asInputStream(boolean releaseOnClose) { - return new DefaultDataBufferInputStream(); + ByteBuffer copy = allocate(length, this.byteBuffer.isDirect()); + ByteBuffer readOnly = this.byteBuffer.asReadOnlyBuffer(); + readOnly.clear().position(index).limit(index + length); + copy.put(readOnly); + return copy.flip(); } - @Override - public OutputStream asOutputStream() { - return new DefaultDataBufferOutputStream(); - } - - @Override public String toString(int index, int length, Charset charset) { checkIndex(index, length); @@ -452,9 +476,17 @@ public class DefaultDataBuffer implements DataBuffer { private void checkIndex(int index, int length) { + checkIndex(index); + checkLength(length); + } + + private void checkIndex(int index) { assertIndex(index >= 0, "index %d must be >= 0", index); - assertIndex(length >= 0, "length %d must be >= 0", length); assertIndex(index <= this.capacity, "index %d must be <= %d", index, this.capacity); + } + + private void checkLength(int length) { + assertIndex(length >= 0, "length %d must be >= 0", length); assertIndex(length <= this.capacity, "length %d must be <= %d", length, this.capacity); } @@ -466,47 +498,6 @@ public class DefaultDataBuffer implements DataBuffer { } - private class DefaultDataBufferInputStream extends InputStream { - - @Override - public int available() { - return readableByteCount(); - } - - @Override - public int read() { - return available() > 0 ? DefaultDataBuffer.this.read() & 0xFF : -1; - } - - @Override - public int read(byte[] bytes, int off, int len) throws IOException { - int available = available(); - if (available > 0) { - len = Math.min(len, available); - DefaultDataBuffer.this.read(bytes, off, len); - return len; - } - else { - return -1; - } - } - } - - - private class DefaultDataBufferOutputStream extends OutputStream { - - @Override - public void write(int b) throws IOException { - DefaultDataBuffer.this.write((byte) b); - } - - @Override - public void write(byte[] bytes, int off, int len) throws IOException { - DefaultDataBuffer.this.write(bytes, off, len); - } - } - - private static class SlicedDefaultDataBuffer extends DefaultDataBuffer { SlicedDefaultDataBuffer(ByteBuffer byteBuffer, DefaultDataBufferFactory dataBufferFactory, int length) { diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBufferFactory.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBufferFactory.java index 9da24e5051..81ed6242bd 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBufferFactory.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBufferFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -85,6 +85,7 @@ public class DefaultDataBufferFactory implements DataBufferFactory { @Override + @Deprecated public DefaultDataBuffer allocateBuffer() { return allocateBuffer(this.defaultInitialCapacity); } @@ -122,6 +123,10 @@ public class DefaultDataBufferFactory implements DataBufferFactory { return result; } + @Override + public boolean isDirect() { + return this.preferDirect; + } @Override public String toString() { diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBuffer.java new file mode 100644 index 0000000000..03a9ebc8b7 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBuffer.java @@ -0,0 +1,346 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.buffer; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.function.IntPredicate; + +import io.netty5.buffer.api.Buffer; +import io.netty5.util.AsciiString; + +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; + + +/** + * Implementation of the {@code DataBuffer} interface that wraps a Netty 5 + * {@link Buffer}. Typically constructed with {@link Netty5DataBufferFactory}. + * + * @author Violeta Georgieva + * @author Arjen Poutsma + * @since 6.0 + */ +public final class Netty5DataBuffer implements CloseableDataBuffer, + TouchableDataBuffer { + + private final Buffer buffer; + + private final Netty5DataBufferFactory dataBufferFactory; + + + /** + * Create a new {@code Netty5DataBuffer} based on the given {@code Buffer}. + * @param buffer the buffer to base this buffer on + */ + Netty5DataBuffer(Buffer buffer, Netty5DataBufferFactory dataBufferFactory) { + Assert.notNull(buffer, "Buffer must not be null"); + Assert.notNull(dataBufferFactory, "NettyDataBufferFactory must not be null"); + this.buffer = buffer; + this.dataBufferFactory = dataBufferFactory; + } + + /** + * Directly exposes the native {@code Buffer} that this buffer is based on. + * @return the wrapped buffer + */ + public Buffer getNativeBuffer() { + return this.buffer; + } + + @Override + public DataBufferFactory factory() { + return this.dataBufferFactory; + } + + @Override + public int indexOf(IntPredicate predicate, int fromIndex) { + Assert.notNull(predicate, "IntPredicate must not be null"); + if (fromIndex < 0) { + fromIndex = 0; + } + else if (fromIndex >= this.buffer.writerOffset()) { + return -1; + } + int length = this.buffer.writerOffset() - fromIndex; + int bytes = this.buffer.openCursor(fromIndex, length).process(predicate.negate()::test); + return bytes == -1 ? -1 : fromIndex + bytes; + } + + @Override + public int lastIndexOf(IntPredicate predicate, int fromIndex) { + Assert.notNull(predicate, "IntPredicate must not be null"); + if (fromIndex < 0) { + return -1; + } + fromIndex = Math.min(fromIndex, this.buffer.writerOffset() - 1); + return this.buffer.openCursor(0, fromIndex + 1).process(predicate.negate()::test); + } + + @Override + public int readableByteCount() { + return this.buffer.readableBytes(); + } + + @Override + public int writableByteCount() { + return this.buffer.writableBytes(); + } + + @Override + public int readPosition() { + return this.buffer.readerOffset(); + } + + @Override + public Netty5DataBuffer readPosition(int readPosition) { + this.buffer.readerOffset(readPosition); + return this; + } + + @Override + public int writePosition() { + return this.buffer.writerOffset(); + } + + @Override + public Netty5DataBuffer writePosition(int writePosition) { + this.buffer.writerOffset(writePosition); + return this; + } + + @Override + public byte getByte(int index) { + return this.buffer.getByte(index); + } + + @Override + public int capacity() { + return this.buffer.capacity(); + } + + @Override + @Deprecated + public Netty5DataBuffer capacity(int capacity) { + if (capacity <= 0) { + throw new IllegalArgumentException(String.format("'newCapacity' %d must be higher than 0", capacity)); + } + int diff = capacity - capacity(); + if (diff > 0) { + this.buffer.ensureWritable(this.buffer.writableBytes() + diff); + } + return this; + } + + @Override + public DataBuffer ensureWritable(int capacity) { + Assert.isTrue(capacity >= 0, "Capacity must be larger than 0"); + this.buffer.ensureWritable(capacity); + return this; + } + + @Override + public byte read() { + return this.buffer.readByte(); + } + + @Override + public Netty5DataBuffer read(byte[] destination) { + return read(destination, 0, destination.length); + } + + @Override + public Netty5DataBuffer read(byte[] destination, int offset, int length) { + this.buffer.readBytes(destination, offset, length); + return this; + } + + @Override + public Netty5DataBuffer write(byte b) { + this.buffer.writeByte(b); + return this; + } + + @Override + public Netty5DataBuffer write(byte[] source) { + this.buffer.writeBytes(source); + return this; + } + + @Override + public Netty5DataBuffer write(byte[] source, int offset, int length) { + this.buffer.writeBytes(source, offset, length); + return this; + } + + @Override + public Netty5DataBuffer write(DataBuffer... buffers) { + if (!ObjectUtils.isEmpty(buffers)) { + if (hasNetty5DataBuffers(buffers)) { + Buffer[] nativeBuffers = new Buffer[buffers.length]; + for (int i = 0; i < buffers.length; i++) { + nativeBuffers[i] = ((Netty5DataBuffer) buffers[i]).getNativeBuffer(); + } + return write(nativeBuffers); + } + else { + ByteBuffer[] byteBuffers = new ByteBuffer[buffers.length]; + for (int i = 0; i < buffers.length; i++) { + byteBuffers[i] = buffers[i].toByteBuffer(); + + } + return write(byteBuffers); + } + } + return this; + } + + private static boolean hasNetty5DataBuffers(DataBuffer[] buffers) { + for (DataBuffer buffer : buffers) { + if (!(buffer instanceof Netty5DataBuffer)) { + return false; + } + } + return true; + } + + @Override + public Netty5DataBuffer write(ByteBuffer... buffers) { + if (!ObjectUtils.isEmpty(buffers)) { + for (ByteBuffer buffer : buffers) { + this.buffer.writeBytes(buffer); + } + } + return this; + } + + /** + * Writes one or more Netty 5 {@link Buffer Buffers} to this buffer, + * starting at the current writing position. + * @param buffers the buffers to write into this buffer + * @return this buffer + */ + public Netty5DataBuffer write(Buffer... buffers) { + if (!ObjectUtils.isEmpty(buffers)) { + for (Buffer buffer : buffers) { + this.buffer.writeBytes(buffer); + } + } + return this; + } + + @Override + public DataBuffer write(CharSequence charSequence, Charset charset) { + Assert.notNull(charSequence, "CharSequence must not be null"); + Assert.notNull(charset, "Charset must not be null"); + + if (StandardCharsets.US_ASCII.equals(charset) && charSequence instanceof AsciiString asciiString) { + this.buffer.writeBytes(asciiString.array(), asciiString.arrayOffset(), asciiString.length()); + } + else { + byte[] bytes = charSequence.toString().getBytes(charset); + this.buffer.writeBytes(bytes); + } + return this; + } + + /** + * {@inheritDoc} + *

Note that due to the lack of a {@code slice} method + * in Netty 5's {@link Buffer}, this implementation returns a copy that + * does not share its contents with this buffer. + */ + @Override + @Deprecated + public DataBuffer slice(int index, int length) { + Buffer copy = this.buffer.copy(index, length); + return new Netty5DataBuffer(copy, this.dataBufferFactory); + } + + @Override + public DataBuffer split(int index) { + Buffer split = this.buffer.split(index); + return new Netty5DataBuffer(split, this.dataBufferFactory); + } + + @Override + @Deprecated + public ByteBuffer asByteBuffer() { + return toByteBuffer(); + } + + @Override + @Deprecated + public ByteBuffer asByteBuffer(int index, int length) { + return toByteBuffer(index, length); + } + + @Override + @Deprecated + public ByteBuffer toByteBuffer(int index, int length) { + ByteBuffer copy = this.buffer.isDirect() ? + ByteBuffer.allocateDirect(length) : + ByteBuffer.allocate(length); + + this.buffer.copyInto(index, copy, 0, length); + return copy; + } + + @Override + public String toString(Charset charset) { + Assert.notNull(charset, "Charset must not be null"); + return this.buffer.toString(charset); + } + + @Override + public String toString(int index, int length, Charset charset) { + Assert.notNull(charset, "Charset must not be null"); + byte[] data = new byte[length]; + this.buffer.copyInto(index, data, 0, length); + return new String(data, 0, length, charset); + } + + @Override + public Netty5DataBuffer touch(Object hint) { + this.buffer.touch(hint); + return this; + } + + @Override + public void close() { + this.buffer.close(); + } + + + public boolean equals(@Nullable Object other) { + return (this == other || (other instanceof Netty5DataBuffer dataBuffer && + this.buffer.equals(dataBuffer.buffer))); + } + + @Override + public int hashCode() { + return this.buffer.hashCode(); + } + + @Override + public String toString() { + return this.buffer.toString(); + } + +} diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBufferFactory.java b/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBufferFactory.java new file mode 100644 index 0000000000..5ef803b0a8 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBufferFactory.java @@ -0,0 +1,139 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.buffer; + +import java.nio.ByteBuffer; +import java.util.List; + +import io.netty5.buffer.api.Buffer; +import io.netty5.buffer.api.BufferAllocator; +import io.netty5.buffer.api.CompositeBuffer; +import io.netty5.buffer.api.DefaultBufferAllocators; + +import org.springframework.util.Assert; + +/** + * Implementation of the {@code DataBufferFactory} interface based on a + * Netty 5 {@link BufferAllocator}. + * + * @author Violeta Georgieva + * @author Arjen Poutsma + * @since 6.0 + */ +public class Netty5DataBufferFactory implements DataBufferFactory { + + private final BufferAllocator bufferAllocator; + + + /** + * Create a new {@code Netty5DataBufferFactory} based on the given factory. + * @param bufferAllocator the factory to use + */ + public Netty5DataBufferFactory(BufferAllocator bufferAllocator) { + Assert.notNull(bufferAllocator, "BufferAllocator must not be null"); + this.bufferAllocator = bufferAllocator; + } + + + /** + * Return the {@code BufferAllocator} used by this factory. + */ + public BufferAllocator getBufferAllocator() { + return this.bufferAllocator; + } + + @Override + @Deprecated + public Netty5DataBuffer allocateBuffer() { + Buffer buffer = this.bufferAllocator.allocate(256); + return new Netty5DataBuffer(buffer, this); + } + + @Override + public Netty5DataBuffer allocateBuffer(int initialCapacity) { + Buffer buffer = this.bufferAllocator.allocate(initialCapacity); + return new Netty5DataBuffer(buffer, this); + } + + @Override + public Netty5DataBuffer wrap(ByteBuffer byteBuffer) { + Buffer buffer = this.bufferAllocator.copyOf(byteBuffer); + return new Netty5DataBuffer(buffer, this); + } + + @Override + public Netty5DataBuffer wrap(byte[] bytes) { + Buffer buffer = this.bufferAllocator.copyOf(bytes); + return new Netty5DataBuffer(buffer, this); + } + + /** + * Wrap the given Netty {@link Buffer} in a {@code Netty5DataBuffer}. + * @param buffer the Netty buffer to wrap + * @return the wrapped buffer + */ + public Netty5DataBuffer wrap(Buffer buffer) { + buffer.touch("Wrap buffer"); + return new Netty5DataBuffer(buffer, this); + } + + /** + * {@inheritDoc} + *

This implementation uses Netty's {@link CompositeBuffer}. + */ + @Override + public DataBuffer join(List dataBuffers) { + Assert.notEmpty(dataBuffers, "DataBuffer List must not be empty"); + if (dataBuffers.size() == 1) { + return dataBuffers.get(0); + } + CompositeBuffer composite = this.bufferAllocator.compose(); + for (DataBuffer dataBuffer : dataBuffers) { + Assert.isInstanceOf(Netty5DataBuffer.class, dataBuffer); + composite.extendWith(((Netty5DataBuffer) dataBuffer).getNativeBuffer().send()); + } + return new Netty5DataBuffer(composite, this); + } + + @Override + public boolean isDirect() { + return this.bufferAllocator.getAllocationType().isDirect(); + } + + /** + * Return the given Netty {@link DataBuffer} as a {@link Buffer}. + *

Returns the {@linkplain Netty5DataBuffer#getNativeBuffer() native buffer} + * if {@code buffer} is a {@link Netty5DataBuffer}; returns + * {@link BufferAllocator#copyOf(ByteBuffer)} otherwise. + * @param buffer the {@code DataBuffer} to return a {@code Buffer} for + * @return the netty {@code Buffer} + */ + public static Buffer toBuffer(DataBuffer buffer) { + if (buffer instanceof Netty5DataBuffer netty5DataBuffer) { + return netty5DataBuffer.getNativeBuffer(); + } + else { + return DefaultBufferAllocators.preferredAllocator().copyOf(buffer.toByteBuffer()); + } + } + + + @Override + public String toString() { + return "Netty5DataBufferFactory (" + this.bufferAllocator + ")"; + } +} diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java index 7809c65295..6593e6d976 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,16 +16,12 @@ package org.springframework.core.io.buffer; -import java.io.InputStream; -import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.function.IntPredicate; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufInputStream; -import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.ByteBufUtil; import org.springframework.lang.Nullable; @@ -33,7 +29,7 @@ import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; /** - * Implementation of the {@code DataBuffer} interface that wraps a Netty + * Implementation of the {@code DataBuffer} interface that wraps a Netty 4 * {@link ByteBuf}. Typically constructed with {@link NettyDataBufferFactory}. * * @author Arjen Poutsma @@ -42,7 +38,7 @@ import org.springframework.util.ObjectUtils; */ public class NettyDataBuffer implements PooledDataBuffer { - private final ByteBuf byteBuf; + private ByteBuf byteBuf; private final NettyDataBufferFactory dataBufferFactory; @@ -138,13 +134,14 @@ public class NettyDataBuffer implements PooledDataBuffer { } @Override + @Deprecated public NettyDataBuffer capacity(int capacity) { this.byteBuf.capacity(capacity); return this; } @Override - public DataBuffer ensureCapacity(int capacity) { + public DataBuffer ensureWritable(int capacity) { this.byteBuf.ensureWritable(capacity); return this; } @@ -197,8 +194,7 @@ public class NettyDataBuffer implements PooledDataBuffer { else { ByteBuffer[] byteBuffers = new ByteBuffer[buffers.length]; for (int i = 0; i < buffers.length; i++) { - byteBuffers[i] = buffers[i].asByteBuffer(); - + byteBuffers[i] = buffers[i].toByteBuffer(); } write(byteBuffers); } @@ -257,40 +253,56 @@ public class NettyDataBuffer implements PooledDataBuffer { } @Override + @Deprecated public NettyDataBuffer slice(int index, int length) { ByteBuf slice = this.byteBuf.slice(index, length); return new NettyDataBuffer(slice, this.dataBufferFactory); } @Override + @Deprecated public NettyDataBuffer retainedSlice(int index, int length) { ByteBuf slice = this.byteBuf.retainedSlice(index, length); return new NettyDataBuffer(slice, this.dataBufferFactory); } @Override + public NettyDataBuffer split(int index) { + ByteBuf split = this.byteBuf.retainedSlice(0, index); + int writerIndex = this.byteBuf.writerIndex(); + int readerIndex = this.byteBuf.readerIndex(); + + split.writerIndex(Math.min(writerIndex, index)); + split.readerIndex(Math.min(readerIndex, index)); + + this.byteBuf = this.byteBuf.slice(index, this.byteBuf.capacity() - index); + this.byteBuf.writerIndex(Math.max(writerIndex, index) - index); + this.byteBuf.readerIndex(Math.max(readerIndex, index) - index); + + return new NettyDataBuffer(split, this.dataBufferFactory); + } + + @Override + @Deprecated public ByteBuffer asByteBuffer() { return this.byteBuf.nioBuffer(); } @Override + @Deprecated public ByteBuffer asByteBuffer(int index, int length) { return this.byteBuf.nioBuffer(index, length); } @Override - public InputStream asInputStream() { - return new ByteBufInputStream(this.byteBuf); - } + public ByteBuffer toByteBuffer(int index, int length) { + ByteBuffer result = this.byteBuf.isDirect() ? + ByteBuffer.allocateDirect(length) : + ByteBuffer.allocate(length); - @Override - public InputStream asInputStream(boolean releaseOnClose) { - return new ByteBufInputStream(this.byteBuf, releaseOnClose); - } + this.byteBuf.getBytes(index, result); - @Override - public OutputStream asOutputStream() { - return new ByteBufOutputStream(this.byteBuf); + return result.flip(); } @Override diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java index 2fb09959f8..885d8f6f9c 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java @@ -28,7 +28,7 @@ import org.springframework.util.Assert; /** * Implementation of the {@code DataBufferFactory} interface based on a - * Netty {@link ByteBufAllocator}. + * Netty 4 {@link ByteBufAllocator}. * * @author Arjen Poutsma * @author Juergen Hoeller @@ -61,6 +61,7 @@ public class NettyDataBufferFactory implements DataBufferFactory { } @Override + @Deprecated public NettyDataBuffer allocateBuffer() { ByteBuf byteBuf = this.byteBufAllocator.buffer(); return new NettyDataBuffer(byteBuf, this); @@ -113,6 +114,11 @@ public class NettyDataBufferFactory implements DataBufferFactory { return new NettyDataBuffer(composite, this); } + @Override + public boolean isDirect() { + return this.byteBufAllocator.isDirectBufferPooled(); + } + /** * Return the given Netty {@link DataBuffer} as a {@link ByteBuf}. *

Returns the {@linkplain NettyDataBuffer#getNativeBuffer() native buffer} @@ -126,7 +132,7 @@ public class NettyDataBufferFactory implements DataBufferFactory { return nettyDataBuffer.getNativeBuffer(); } else { - return Unpooled.wrappedBuffer(buffer.asByteBuffer()); + return Unpooled.wrappedBuffer(buffer.toByteBuffer()); } } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/PooledDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/PooledDataBuffer.java index e3e794214e..fa50feff4a 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/PooledDataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/PooledDataBuffer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,13 +17,13 @@ package org.springframework.core.io.buffer; /** - * Extension of {@link DataBuffer} that allows for buffer that share + * Extension of {@link DataBuffer} that allows for buffers that share * a memory pool. Introduces methods for reference counting. * * @author Arjen Poutsma * @since 5.0 */ -public interface PooledDataBuffer extends DataBuffer { +public interface PooledDataBuffer extends TouchableDataBuffer { /** * Return {@code true} if this buffer is allocated; @@ -43,6 +43,7 @@ public interface PooledDataBuffer extends DataBuffer { * @return this buffer * @since 5.3.2 */ + @Override PooledDataBuffer touch(Object hint); /** diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/TouchableDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/TouchableDataBuffer.java new file mode 100644 index 0000000000..7f724b7cea --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/TouchableDataBuffer.java @@ -0,0 +1,34 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.buffer; + +/** + * Extension of {@link DataBuffer} that allows for buffers that can be given + * hints for debugging purposes. + * + * @author Arjen Poutsma + * @since 6.0 + */ +public interface TouchableDataBuffer extends DataBuffer { + + /** + * Associate the given hint with the data buffer for debugging purposes. + * @return this buffer + */ + TouchableDataBuffer touch(Object hint); + +} diff --git a/spring-core/src/test/java/org/springframework/core/codec/Netty5BufferDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/Netty5BufferDecoderTests.java new file mode 100644 index 0000000000..6d5997b446 --- /dev/null +++ b/spring-core/src/test/java/org/springframework/core/codec/Netty5BufferDecoderTests.java @@ -0,0 +1,93 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec; + +import java.nio.charset.StandardCharsets; +import java.util.function.Consumer; + +import io.netty5.buffer.api.Buffer; +import io.netty5.buffer.api.DefaultBufferAllocators; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.testfixture.codec.AbstractDecoderTests; +import org.springframework.util.MimeTypeUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Arjen Poutsma + */ +class Netty5BufferDecoderTests extends AbstractDecoderTests { + + private final byte[] fooBytes = "foo".getBytes(StandardCharsets.UTF_8); + + private final byte[] barBytes = "bar".getBytes(StandardCharsets.UTF_8); + + + Netty5BufferDecoderTests() { + super(new Netty5BufferDecoder()); + } + + @Override + @Test + public void canDecode() { + assertThat(this.decoder.canDecode(ResolvableType.forClass(Buffer.class), + MimeTypeUtils.TEXT_PLAIN)).isTrue(); + assertThat(this.decoder.canDecode(ResolvableType.forClass(Integer.class), + MimeTypeUtils.TEXT_PLAIN)).isFalse(); + assertThat(this.decoder.canDecode(ResolvableType.forClass(Buffer.class), + MimeTypeUtils.APPLICATION_JSON)).isTrue(); + } + + @Override + @Test + public void decode() { + Flux input = Flux.concat( + dataBuffer(this.fooBytes), + dataBuffer(this.barBytes)); + + testDecodeAll(input, Buffer.class, step -> step + .consumeNextWith(expectByteBuffer(DefaultBufferAllocators.preferredAllocator().copyOf(this.fooBytes))) + .consumeNextWith(expectByteBuffer(DefaultBufferAllocators.preferredAllocator().copyOf(this.barBytes))) + .verifyComplete()); + } + + @Override + @Test + public void decodeToMono() { + Flux input = Flux.concat( + dataBuffer(this.fooBytes), + dataBuffer(this.barBytes)); + + Buffer expected = DefaultBufferAllocators.preferredAllocator().allocate(this.fooBytes.length + this.barBytes.length) + .writeBytes(this.fooBytes) + .writeBytes(this.barBytes) + .readerOffset(0); + + testDecodeToMonoAll(input, Buffer.class, step -> step + .consumeNextWith(expectByteBuffer(expected)) + .verifyComplete()); + } + + private Consumer expectByteBuffer(Buffer expected) { + return actual -> assertThat(actual).isEqualTo(expected); + } + +} diff --git a/spring-core/src/test/java/org/springframework/core/codec/Netty5BufferEncoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/Netty5BufferEncoderTests.java new file mode 100644 index 0000000000..ee619d3607 --- /dev/null +++ b/spring-core/src/test/java/org/springframework/core/codec/Netty5BufferEncoderTests.java @@ -0,0 +1,70 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec; + +import java.nio.charset.StandardCharsets; + +import io.netty5.buffer.api.Buffer; +import io.netty5.buffer.api.DefaultBufferAllocators; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + +import org.springframework.core.ResolvableType; +import org.springframework.core.testfixture.codec.AbstractEncoderTests; +import org.springframework.util.MimeTypeUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Arjen Poutsma + */ +class Netty5BufferEncoderTests extends AbstractEncoderTests { + + private final byte[] fooBytes = "foo".getBytes(StandardCharsets.UTF_8); + + private final byte[] barBytes = "bar".getBytes(StandardCharsets.UTF_8); + + Netty5BufferEncoderTests() { + super(new Netty5BufferEncoder()); + } + + @Override + @Test + public void canEncode() { + assertThat(this.encoder.canEncode(ResolvableType.forClass(Buffer.class), + MimeTypeUtils.TEXT_PLAIN)).isTrue(); + assertThat(this.encoder.canEncode(ResolvableType.forClass(Integer.class), + MimeTypeUtils.TEXT_PLAIN)).isFalse(); + assertThat(this.encoder.canEncode(ResolvableType.forClass(Buffer.class), + MimeTypeUtils.APPLICATION_JSON)).isTrue(); + + // gh-20024 + assertThat(this.encoder.canEncode(ResolvableType.NONE, null)).isFalse(); + } + + @Override + @Test + public void encode() { + Flux input = Flux.just(this.fooBytes, this.barBytes) + .map(DefaultBufferAllocators.preferredAllocator()::copyOf); + + testEncodeAll(input, Buffer.class, step -> step + .consumeNextWith(expectBytes(this.fooBytes)) + .consumeNextWith(expectBytes(this.barBytes)) + .verifyComplete()); + } +} diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java index c7278e3c11..45dc7e7a53 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java @@ -28,6 +28,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatException; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.junit.jupiter.api.Assumptions.assumeFalse; /** * @author Arjen Poutsma @@ -402,6 +403,9 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { @ParameterizedDataBufferAllocatingTest void decreaseCapacityLowReadPosition(DataBufferFactory bufferFactory) { + assumeFalse(bufferFactory instanceof Netty5DataBufferFactory, + "Netty 5 does not support decreasing the capacity"); + super.bufferFactory = bufferFactory; DataBuffer buffer = createDataBuffer(2); @@ -414,6 +418,9 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { @ParameterizedDataBufferAllocatingTest void decreaseCapacityHighReadPosition(DataBufferFactory bufferFactory) { + assumeFalse(bufferFactory instanceof Netty5DataBufferFactory, + "Netty 5 does not support decreasing the capacity"); + super.bufferFactory = bufferFactory; DataBuffer buffer = createDataBuffer(2); @@ -492,6 +499,7 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { } @ParameterizedDataBufferAllocatingTest + @SuppressWarnings("deprecation") void asByteBuffer(DataBufferFactory bufferFactory) { super.bufferFactory = bufferFactory; @@ -513,6 +521,7 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { } @ParameterizedDataBufferAllocatingTest + @SuppressWarnings("deprecation") void asByteBufferIndexLength(DataBufferFactory bufferFactory) { super.bufferFactory = bufferFactory; @@ -522,6 +531,9 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { ByteBuffer result = buffer.asByteBuffer(1, 2); assertThat(result.capacity()).isEqualTo(2); + assumeFalse(bufferFactory instanceof Netty5DataBufferFactory, + "Netty 5 does share the internal buffer"); + buffer.write((byte) 'c'); assertThat(result.remaining()).isEqualTo(2); @@ -533,7 +545,11 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { } @ParameterizedDataBufferAllocatingTest + @SuppressWarnings("deprecation") void byteBufferContainsDataBufferChanges(DataBufferFactory bufferFactory) { + assumeFalse(bufferFactory instanceof Netty5DataBufferFactory, + "Netty 5 does not support sharing data between buffers"); + super.bufferFactory = bufferFactory; DataBuffer dataBuffer = createDataBuffer(1); @@ -549,7 +565,11 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { } @ParameterizedDataBufferAllocatingTest + @SuppressWarnings("deprecation") void dataBufferContainsByteBufferChanges(DataBufferFactory bufferFactory) { + assumeFalse(bufferFactory instanceof Netty5DataBufferFactory, + "Netty 5 does not support sharing data between buffers"); + super.bufferFactory = bufferFactory; DataBuffer dataBuffer = createDataBuffer(1); @@ -565,6 +585,7 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { } @ParameterizedDataBufferAllocatingTest + @SuppressWarnings("deprecation") void emptyAsByteBuffer(DataBufferFactory bufferFactory) { super.bufferFactory = bufferFactory; @@ -576,6 +597,45 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { release(buffer); } + + @ParameterizedDataBufferAllocatingTest + void toByteBuffer(DataBufferFactory bufferFactory) { + super.bufferFactory = bufferFactory; + + DataBuffer buffer = createDataBuffer(4); + buffer.write(new byte[]{'a', 'b', 'c'}); + buffer.read(); // skip a + + ByteBuffer result = buffer.toByteBuffer(); + assertThat(result.capacity()).isEqualTo(2); + assertThat(result.remaining()).isEqualTo(2); + + byte[] resultBytes = new byte[2]; + result.get(resultBytes); + assertThat(resultBytes).isEqualTo(new byte[]{'b', 'c'}); + + release(buffer); + } + + @ParameterizedDataBufferAllocatingTest + void toByteBufferIndexLength(DataBufferFactory bufferFactory) { + super.bufferFactory = bufferFactory; + + DataBuffer buffer = createDataBuffer(3); + buffer.write(new byte[]{'a', 'b', 'c'}); + + ByteBuffer result = buffer.toByteBuffer(1, 2); + assertThat(result.capacity()).isEqualTo(2); + assertThat(result.remaining()).isEqualTo(2); + + byte[] resultBytes = new byte[2]; + result.get(resultBytes); + assertThat(resultBytes).isEqualTo(new byte[]{'b', 'c'}); + + release(buffer); + } + + @ParameterizedDataBufferAllocatingTest void indexOf(DataBufferFactory bufferFactory) { super.bufferFactory = bufferFactory; @@ -630,6 +690,7 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { } @ParameterizedDataBufferAllocatingTest + @SuppressWarnings("deprecation") void slice(DataBufferFactory bufferFactory) { super.bufferFactory = bufferFactory; @@ -638,7 +699,6 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { DataBuffer slice = buffer.slice(1, 2); assertThat(slice.readableByteCount()).isEqualTo(2); - assertThatException().isThrownBy(() -> slice.write((byte) 0)); buffer.write((byte) 'c'); assertThat(buffer.readableByteCount()).isEqualTo(3); @@ -651,13 +711,18 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { result = new byte[2]; slice.read(result); - assertThat(result).isEqualTo(new byte[]{'b', 'c'}); - - + if (!(bufferFactory instanceof Netty5DataBufferFactory)) { + assertThat(result).isEqualTo(new byte[]{'b', 'c'}); + } + else { + assertThat(result).isEqualTo(new byte[]{'b', 0}); + release(slice); + } release(buffer); } @ParameterizedDataBufferAllocatingTest + @SuppressWarnings("deprecation") void retainedSlice(DataBufferFactory bufferFactory) { super.bufferFactory = bufferFactory; @@ -666,7 +731,6 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { DataBuffer slice = buffer.retainedSlice(1, 2); assertThat(slice.readableByteCount()).isEqualTo(2); - assertThatException().isThrownBy(() -> slice.write((byte) 0)); buffer.write((byte) 'c'); assertThat(buffer.readableByteCount()).isEqualTo(3); @@ -679,8 +743,12 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { result = new byte[2]; slice.read(result); - assertThat(result).isEqualTo(new byte[]{'b', 'c'}); - + if (!(bufferFactory instanceof Netty5DataBufferFactory)) { + assertThat(result).isEqualTo(new byte[]{'b', 'c'}); + } + else { + assertThat(result).isEqualTo(new byte[]{'b', 0}); + } release(buffer, slice); } @@ -705,6 +773,58 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { release(buffer); } + @ParameterizedDataBufferAllocatingTest + void split(DataBufferFactory bufferFactory) { + super.bufferFactory = bufferFactory; + + DataBuffer buffer = createDataBuffer(3); + buffer.write(new byte[]{'a', 'b'}); + + assertThatException().isThrownBy(() -> buffer.split(-1)); + assertThatException().isThrownBy(() -> buffer.split(4)); + + DataBuffer split = buffer.split(1); + + assertThat(split.readPosition()).isEqualTo(0); + assertThat(split.writePosition()).isEqualTo(1); + assertThat(split.capacity()).isEqualTo(1); + assertThat(split.readableByteCount()).isEqualTo(1); + byte[] bytes = new byte[1]; + split.read(bytes); + assertThat(bytes).containsExactly('a'); + + assertThat(buffer.readPosition()).isEqualTo(0); + assertThat(buffer.writePosition()).isEqualTo(1); + assertThat(buffer.capacity()).isEqualTo(2); + + buffer.write((byte) 'c'); + assertThat(buffer.readableByteCount()).isEqualTo(2); + bytes = new byte[2]; + buffer.read(bytes); + + assertThat(bytes).isEqualTo(new byte[]{'b', 'c'}); + + + DataBuffer buffer2 = createDataBuffer(1); + buffer2.write(new byte[]{'a'}); + split = buffer2.split(1); + + assertThat(split.readPosition()).isEqualTo(0); + assertThat(split.writePosition()).isEqualTo(1); + assertThat(split.capacity()).isEqualTo(1); + assertThat(split.readableByteCount()).isEqualTo(1); + bytes = new byte[1]; + split.read(bytes); + assertThat(bytes).containsExactly('a'); + + assertThat(buffer2.readPosition()).isEqualTo(0); + assertThat(buffer2.writePosition()).isEqualTo(0); + assertThat(buffer2.capacity()).isEqualTo(0); + assertThat(buffer.readableByteCount()).isEqualTo(0); + + release(buffer, buffer2); + } + @ParameterizedDataBufferAllocatingTest void join(DataBufferFactory bufferFactory) { super.bufferFactory = bufferFactory; diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index 74516632d7..05a0d91ed7 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -115,7 +115,7 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests { DataBufferUtils.readByteChannel(() -> channel, super.bufferFactory, 3); StepVerifier.create(result) - .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("")) .expectError(IOException.class) .verify(Duration.ofSeconds(3)); } @@ -170,17 +170,15 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests { willAnswer(invocation -> { ByteBuffer byteBuffer = invocation.getArgument(0); byteBuffer.put("foo".getBytes(StandardCharsets.UTF_8)); - byteBuffer.flip(); long pos = invocation.getArgument(1); assertThat(pos).isEqualTo(0); - DataBuffer dataBuffer = invocation.getArgument(2); - CompletionHandler completionHandler = invocation.getArgument(3); - completionHandler.completed(3, dataBuffer); + CompletionHandler completionHandler = invocation.getArgument(3); + completionHandler.completed(3, byteBuffer); return null; }).willAnswer(invocation -> { - DataBuffer dataBuffer = invocation.getArgument(2); - CompletionHandler completionHandler = invocation.getArgument(3); - completionHandler.failed(new IOException(), dataBuffer); + ByteBuffer byteBuffer = invocation.getArgument(0); + CompletionHandler completionHandler = invocation.getArgument(3); + completionHandler.failed(new IOException(), byteBuffer); return null; }) .given(channel).read(any(), anyLong(), any(), any()); diff --git a/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/AbstractDataBufferAllocatingTests.java b/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/AbstractDataBufferAllocatingTests.java index 25d4ee978a..47d18ffd66 100644 --- a/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/AbstractDataBufferAllocatingTests.java +++ b/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/AbstractDataBufferAllocatingTests.java @@ -34,6 +34,7 @@ import io.netty.buffer.PoolArenaMetric; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.PooledByteBufAllocatorMetric; import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty5.buffer.api.BufferAllocator; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; @@ -45,6 +46,7 @@ import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.core.io.buffer.Netty5DataBufferFactory; import org.springframework.core.io.buffer.NettyDataBufferFactory; import static java.nio.charset.StandardCharsets.UTF_8; @@ -162,18 +164,26 @@ public abstract class AbstractDataBufferAllocatingTests { @SuppressWarnings("deprecation") // PooledByteBufAllocator no longer supports tinyCacheSize. public static Stream dataBufferFactories() { return Stream.of( - arguments(named("NettyDataBufferFactory - UnpooledByteBufAllocator - preferDirect = true", - new NettyDataBufferFactory(new UnpooledByteBufAllocator(true)))), +// arguments(named("NettyDataBufferFactory - UnpooledByteBufAllocator - preferDirect = true", +// new NettyDataBufferFactory(new UnpooledByteBufAllocator(true)))), arguments(named("NettyDataBufferFactory - UnpooledByteBufAllocator - preferDirect = false", new NettyDataBufferFactory(new UnpooledByteBufAllocator(false)))), // 1) Disable caching for reliable leak detection, see https://github.com/netty/netty/issues/5275 // 2) maxOrder is 4 (vs default 11) but can be increased if necessary - arguments(named("NettyDataBufferFactory - PooledByteBufAllocator - preferDirect = true", - new NettyDataBufferFactory(new PooledByteBufAllocator(true, 1, 1, 4096, 4, 0, 0, 0, true)))), - arguments(named("NettyDataBufferFactory - PooledByteBufAllocator - preferDirect = false", - new NettyDataBufferFactory(new PooledByteBufAllocator(false, 1, 1, 4096, 4, 0, 0, 0, true)))), - arguments(named("DefaultDataBufferFactory - preferDirect = true", - new DefaultDataBufferFactory(true))), +// arguments(named("NettyDataBufferFactory - PooledByteBufAllocator - preferDirect = true", +// new NettyDataBufferFactory(new PooledByteBufAllocator(true, 1, 1, 4096, 4, 0, 0, 0, true)))), +// arguments(named("NettyDataBufferFactory - PooledByteBufAllocator - preferDirect = false", +// new NettyDataBufferFactory(new PooledByteBufAllocator(false, 1, 1, 4096, 4, 0, 0, 0, true)))), + arguments(named("Netty5DataBufferFactory - BufferAllocator.onHeapUnpooled()", + new Netty5DataBufferFactory(BufferAllocator.onHeapUnpooled()))), +// arguments(named("Netty5DataBufferFactory - BufferAllocator.offHeapUnpooled()", +// new Netty5DataBufferFactory(BufferAllocator.offHeapUnpooled()))), +// arguments(named("Netty5DataBufferFactory - BufferAllocator.onHeapPooled()", +// new Netty5DataBufferFactory(BufferAllocator.onHeapPooled()))), +// arguments(named("Netty5DataBufferFactory - BufferAllocator.offHeapPooled()", +// new Netty5DataBufferFactory(BufferAllocator.offHeapPooled()))), +// arguments(named("DefaultDataBufferFactory - preferDirect = true", +// new DefaultDataBufferFactory(true))), arguments(named("DefaultDataBufferFactory - preferDirect = false", new DefaultDataBufferFactory(false))) ); diff --git a/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/LeakAwareDataBufferFactory.java b/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/LeakAwareDataBufferFactory.java index 129821ccb5..64f2b0684c 100644 --- a/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/LeakAwareDataBufferFactory.java +++ b/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/LeakAwareDataBufferFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -107,6 +107,7 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory { } @Override + @Deprecated public DataBuffer allocateBuffer() { return createLeakAwareDataBuffer(this.delegate.allocateBuffer()); } @@ -143,4 +144,9 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory { return new LeakAwareDataBuffer(this.delegate.join(dataBuffers), this); } + @Override + public boolean isDirect() { + return this.delegate.isDirect(); + } + } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java index 7b2e2a7f91..1591e47b12 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -96,13 +96,12 @@ public abstract class PayloadUtils { static ByteBuf asByteBuf(DataBuffer buffer) { - return buffer instanceof NettyDataBuffer ? - ((NettyDataBuffer) buffer).getNativeBuffer() : Unpooled.wrappedBuffer(buffer.asByteBuffer()); + return NettyDataBufferFactory.toByteBuf(buffer); } private static ByteBuffer asByteBuffer(DataBuffer buffer) { return buffer instanceof DefaultDataBuffer ? - ((DefaultDataBuffer) buffer).getNativeBuffer() : buffer.asByteBuffer(); + ((DefaultDataBuffer) buffer).getNativeBuffer() : buffer.toByteBuffer(); } } diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java index bff52e2410..f0120d3a67 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -136,7 +136,7 @@ class WiretapConnector implements ClientHttpConnector { @Nullable private final Flux> publisherNested; - private final DataBuffer buffer = DefaultDataBufferFactory.sharedInstance.allocateBuffer(); + private final DataBuffer buffer = DefaultDataBufferFactory.sharedInstance.allocateBuffer(256); // unsafe(): we're intercepting, already serialized Publisher signals private final Sinks.One content = Sinks.unsafe().one(); diff --git a/spring-web/spring-web.gradle b/spring-web/spring-web.gradle index 8fd59edc2f..74da77ee78 100644 --- a/spring-web/spring-web.gradle +++ b/spring-web/spring-web.gradle @@ -23,6 +23,7 @@ dependencies { optional("io.netty:netty-codec-http") // Until Netty4ClientHttpRequest is removed optional("io.netty:netty-transport") // Until Netty4ClientHttpRequest is removed optional("io.projectreactor.netty:reactor-netty-http") + optional("io.netty:netty5-buffer:5.0.0.Alpha4") optional("io.undertow:undertow-core") optional("org.apache.tomcat.embed:tomcat-embed-core") optional("org.eclipse.jetty:jetty-server") { diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java index c0b399d4eb..1b944bebca 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java @@ -102,7 +102,7 @@ class HttpComponentsClientHttpRequest extends AbstractClientHttpRequest { @Override public Mono writeWith(Publisher body) { return doCommit(() -> { - this.byteBufferFlux = Flux.from(body).map(DataBuffer::asByteBuffer); + this.byteBufferFlux = Flux.from(body).map(DataBuffer::toByteBuffer); return Mono.empty(); }); } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java index 3aed6a3d9d..afe8355317 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -122,8 +122,8 @@ class JdkClientHttpRequest extends AbstractClientHttpRequest { private HttpRequest.BodyPublisher toBodyPublisher(Publisher body) { Publisher byteBufferBody = (body instanceof Mono ? - Mono.from(body).map(DataBuffer::asByteBuffer) : - Flux.from(body).map(DataBuffer::asByteBuffer)); + Mono.from(body).map(DataBuffer::toByteBuffer) : + Flux.from(body).map(DataBuffer::toByteBuffer)); Flow.Publisher bodyFlow = JdkFlowAdapter.publisherToFlowPublisher(byteBufferBody); diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java index 3d6b6b14b5..e2aa2f81a9 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java @@ -33,7 +33,6 @@ import reactor.core.publisher.MonoSink; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; @@ -103,7 +102,7 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest { public Mono writeAndFlushWith(Publisher> body) { return writeWith(Flux.from(body) .flatMap(Function.identity()) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release)); } private String getContentType() { @@ -112,7 +111,7 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest { } private ContentChunk toContentChunk(DataBuffer buffer, MonoSink sink) { - return new ContentChunk(buffer.asByteBuffer(), new Callback() { + return new ContentChunk(buffer.toByteBuffer(), new Callback() { @Override public void succeeded() { DataBufferUtils.release(buffer); diff --git a/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java index c2d6986bd2..9c213a2916 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,7 +30,6 @@ import org.springframework.core.codec.Encoder; import org.springframework.core.codec.Hints; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.HttpLogging; import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpOutputMessage; @@ -135,15 +134,15 @@ public class EncoderHttpMessageWriter implements HttpMessageWriter { Hints.touchDataBuffer(buffer, hints, logger); message.getHeaders().setContentLength(buffer.readableByteCount()); return message.writeWith(Mono.just(buffer) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release)); }) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release); } if (isStreamingMediaType(contentType)) { return message.writeAndFlushWith(body.map(buffer -> { Hints.touchDataBuffer(buffer, hints, logger); - return Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + return Mono.just(buffer).doOnDiscard(DataBuffer.class, DataBufferUtils::release); })); } diff --git a/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java b/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java index c797232b9c..c8224445b8 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java +++ b/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -129,7 +129,7 @@ public class FormHttpMessageReader extends LoggingCodecSupport return DataBufferUtils.join(message.getBody(), this.maxInMemorySize) .map(buffer -> { - CharBuffer charBuffer = charset.decode(buffer.asByteBuffer()); + CharBuffer charBuffer = charset.decode(buffer.toByteBuffer()); String body = charBuffer.toString(); DataBufferUtils.release(buffer); MultiValueMap formData = parseFormData(charset, body); diff --git a/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java index c3f65a31f8..50ee186434 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,6 @@ import org.springframework.core.codec.Hints; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.HttpLogging; import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpOutputMessage; @@ -159,7 +158,7 @@ public class ServerSentEventHttpMessageWriter implements HttpMessageWriter body = Flux.fromIterable(map.entrySet()) .concatMap(entry -> encodePartValues(boundary, entry.getKey(), entry.getValue(), bufferFactory)) .concatWith(generateLastLine(boundary, bufferFactory)) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release); if (logger.isDebugEnabled()) { body = body.doOnNext(buffer -> Hints.touchDataBuffer(buffer, hints, logger)); diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartParser.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartParser.java index bb3c4af6e6..a17c05e8ec 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartParser.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartParser.java @@ -321,10 +321,10 @@ final class MultipartParser extends BaseSubscriber { if (logger.isTraceEnabled()) { logger.trace("First boundary found @" + endIdx + " in " + buf); } - DataBuffer headersBuf = MultipartUtils.sliceFrom(buf, endIdx); - DataBufferUtils.release(buf); + DataBuffer preambleBuffer = buf.split(endIdx + 1); + DataBufferUtils.release(preambleBuffer); - changeState(this, new HeadersState(), headersBuf); + changeState(this, new HeadersState(), buf); } else { DataBufferUtils.release(buf); @@ -390,13 +390,11 @@ final class MultipartParser extends BaseSubscriber { } long count = this.byteCount.addAndGet(endIdx); if (belowMaxHeaderSize(count)) { - DataBuffer headerBuf = MultipartUtils.sliceTo(buf, endIdx); + DataBuffer headerBuf = buf.split(endIdx + 1); this.buffers.add(headerBuf); - DataBuffer bodyBuf = MultipartUtils.sliceFrom(buf, endIdx); - DataBufferUtils.release(buf); - emitHeaders(parseHeaders()); - changeState(this, new BodyState(), bodyBuf); + + changeState(this, new BodyState(), buf); } } else { @@ -514,32 +512,35 @@ final class MultipartParser extends BaseSubscriber { * previous buffer, so we calculate the length and slice the current * and previous buffers accordingly. We then change to {@link HeadersState} * and pass on the remainder of {@code buffer}. If the needle is not found, we - * make {@code buffer} the previous buffer. + * enqueue {@code buffer}. */ @Override public void onNext(DataBuffer buffer) { int endIdx = this.boundary.match(buffer); if (endIdx != -1) { + DataBuffer boundaryBuffer = buffer.split(endIdx + 1); if (logger.isTraceEnabled()) { logger.trace("Boundary found @" + endIdx + " in " + buffer); } - int len = endIdx - buffer.readPosition() - this.boundaryLength + 1; + int len = endIdx - this.boundaryLength + 1; if (len > 0) { // whole boundary in buffer. // slice off the body part, and flush - DataBuffer body = buffer.retainedSlice(buffer.readPosition(), len); + DataBuffer body = boundaryBuffer.split(len); + DataBufferUtils.release(boundaryBuffer); enqueue(body); flush(); } else if (len < 0) { // boundary spans multiple buffers, and we've just found the end // iterate over buffers in reverse order + DataBufferUtils.release(boundaryBuffer); DataBuffer prev; while ((prev = this.queue.pollLast()) != null) { int prevLen = prev.readableByteCount() + len; if (prevLen > 0) { // slice body part of previous buffer, and flush it - DataBuffer body = prev.retainedSlice(prev.readPosition(), prevLen); + DataBuffer body = prev.split(prevLen); DataBufferUtils.release(prev); enqueue(body); flush(); @@ -557,10 +558,7 @@ final class MultipartParser extends BaseSubscriber { flush(); } - DataBuffer remainder = MultipartUtils.sliceFrom(buffer, endIdx); - DataBufferUtils.release(buffer); - - changeState(this, new HeadersState(), remainder); + changeState(this, new HeadersState(), buffer); } else { enqueue(buffer); diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartUtils.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartUtils.java index bebe75d3bf..b7e4d07db0 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartUtils.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartUtils.java @@ -23,7 +23,6 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMessage; import org.springframework.http.MediaType; @@ -85,23 +84,6 @@ abstract class MultipartUtils { return result; } - /** - * Slices the given buffer to the given index (exclusive). - */ - public static DataBuffer sliceTo(DataBuffer buf, int idx) { - int pos = buf.readPosition(); - int len = idx - pos + 1; - return buf.retainedSlice(pos, len); - } - - /** - * Slices the given buffer from the given index (inclusive). - */ - public static DataBuffer sliceFrom(DataBuffer buf, int idx) { - int len = buf.writePosition() - idx - 1; - return buf.retainedSlice(idx + 1, len); - } - public static void closeChannel(Channel channel) { try { if (channel.isOpen()) { diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartWriterSupport.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartWriterSupport.java index cd10fbb01a..87f02e5347 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartWriterSupport.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartWriterSupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,6 +32,7 @@ import org.springframework.http.MediaType; import org.springframework.http.codec.LoggingCodecSupport; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import org.springframework.util.FastByteArrayOutputStream; import org.springframework.util.MimeTypeUtils; import org.springframework.util.MultiValueMap; @@ -164,22 +165,24 @@ public class MultipartWriterSupport extends LoggingCodecSupport { protected Mono generatePartHeaders(HttpHeaders headers, DataBufferFactory bufferFactory) { return Mono.fromCallable(() -> { - DataBuffer buffer = bufferFactory.allocateBuffer(); + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); for (Map.Entry> entry : headers.entrySet()) { byte[] headerName = entry.getKey().getBytes(getCharset()); for (String headerValueString : entry.getValue()) { byte[] headerValue = headerValueString.getBytes(getCharset()); - buffer.write(headerName); - buffer.write((byte)':'); - buffer.write((byte)' '); - buffer.write(headerValue); - buffer.write((byte)'\r'); - buffer.write((byte)'\n'); + bos.write(headerName); + bos.write((byte)':'); + bos.write((byte)' '); + bos.write(headerValue); + bos.write((byte)'\r'); + bos.write((byte)'\n'); } } - buffer.write((byte)'\r'); - buffer.write((byte)'\n'); - return buffer; + bos.write((byte)'\r'); + bos.write((byte)'\n'); + + byte[] bytes = bos.toByteArrayUnsafe(); + return bufferFactory.wrap(bytes); }); } diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/PartEventHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/PartEventHttpMessageWriter.java index 9a7f3565df..83439a9d63 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/PartEventHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/PartEventHttpMessageWriter.java @@ -28,7 +28,6 @@ import org.springframework.core.codec.Hints; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpOutputMessage; import org.springframework.http.codec.HttpMessageWriter; @@ -91,7 +90,7 @@ public class PartEventHttpMessageWriter extends MultipartWriterSupport implement } })) .concatWith(generateLastLine(boundary, outputMessage.bufferFactory())) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release); if (logger.isDebugEnabled()) { body = body.doOnNext(buffer -> Hints.touchDataBuffer(buffer, hints, logger)); diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/PartGenerator.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/PartGenerator.java index ca7678b31c..0d82ee3449 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/PartGenerator.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/PartGenerator.java @@ -729,7 +729,7 @@ final class PartGenerator extends BaseSubscriber { @SuppressWarnings("BlockingMethodInNonBlockingContext") private Mono writeInternal(DataBuffer dataBuffer) { try { - ByteBuffer byteBuffer = dataBuffer.asByteBuffer(); + ByteBuffer byteBuffer = dataBuffer.toByteBuffer(); while (byteBuffer.hasRemaining()) { this.channel.write(byteBuffer); } diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/PartHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/PartHttpMessageWriter.java index 31817470ca..3c64a862bf 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/PartHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/PartHttpMessageWriter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,6 @@ import org.springframework.core.codec.Hints; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpOutputMessage; @@ -67,7 +66,7 @@ public class PartHttpMessageWriter extends MultipartWriterSupport implements Htt Flux body = Flux.from(parts) .concatMap(part -> encodePart(boundary, part, outputMessage.bufferFactory())) .concatWith(generateLastLine(boundary, outputMessage.bufferFactory())) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release); if (logger.isDebugEnabled()) { body = body.doOnNext(buffer -> Hints.touchDataBuffer(buffer, hints, logger)); diff --git a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java index 4804dcf1f4..776f769564 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -151,7 +151,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder(new NettyByteBufDecoder())); } + if (netty5BufferPresent) { + addCodec(this.typedReaders, new DecoderHttpMessageReader<>(new Netty5BufferDecoder())); + } addCodec(this.typedReaders, new ResourceHttpMessageReader(new ResourceDecoder())); addCodec(this.typedReaders, new DecoderHttpMessageReader<>(StringDecoder.textPlainOnly())); if (protobufPresent) { @@ -557,6 +565,9 @@ class BaseDefaultCodecs implements CodecConfigurer.DefaultCodecs, CodecConfigure if (nettyByteBufPresent) { addCodec(writers, new EncoderHttpMessageWriter<>(new NettyByteBufEncoder())); } + if (netty5BufferPresent) { + addCodec(writers, new EncoderHttpMessageWriter<>(new Netty5BufferEncoder())); + } addCodec(writers, new ResourceHttpMessageWriter()); addCodec(writers, new EncoderHttpMessageWriter<>(CharSequenceEncoder.textPlainOnly())); if (protobufPresent) { diff --git a/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java index 06961fce35..74a4900339 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -181,7 +181,7 @@ public class XmlEventDecoder extends AbstractDecoder { public List apply(DataBuffer dataBuffer) { try { increaseByteCount(dataBuffer); - this.streamReader.getInputFeeder().feedInput(dataBuffer.asByteBuffer()); + this.streamReader.getInputFeeder().feedInput(dataBuffer.toByteBuffer()); List events = new ArrayList<>(); while (true) { if (this.streamReader.next() == AsyncXMLStreamReader.EVENT_INCOMPLETE) { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java index 9de245e71f..d9eb907b0e 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,7 +29,6 @@ import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatusCode; import org.springframework.http.ResponseCookie; @@ -190,7 +189,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { try { return writeWithInternal(Mono.fromCallable(() -> buffer) .doOnSubscribe(s -> subscribed.set(true)) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release)); } catch (Throwable ex) { return Mono.error(ex); @@ -204,7 +203,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { }); }) .doOnError(t -> getHeaders().clearContentHeaders()) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release); } else { return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner))) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java index 2862ce8e7a..35964ca6d7 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -135,7 +135,7 @@ public class JettyHttpHandlerAdapter extends ServletHttpHandlerAdapter { @Override protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException { - ByteBuffer input = dataBuffer.asByteBuffer(); + ByteBuffer input = dataBuffer.toByteBuffer(); int len = input.remaining(); ServletResponse response = getNativeResponse(); ((HttpOutput) response.getOutputStream()).write(input); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/TomcatHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/TomcatHttpHandlerAdapter.java index 5223c1a91c..91f3272a80 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/TomcatHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/TomcatHttpHandlerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,7 +39,6 @@ import org.apache.coyote.Response; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.util.Assert; @@ -136,29 +135,20 @@ public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter { // It's possible InputStream can be wrapped, preventing use of CoyoteInputStream return super.readFromInputStream(); } - boolean release = true; - int capacity = this.bufferSize; - DataBuffer dataBuffer = this.factory.allocateBuffer(capacity); - try { - ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, capacity); - int read = coyoteInputStream.read(byteBuffer); - logBytesRead(read); - if (read > 0) { - dataBuffer.writePosition(read); - release = false; - return dataBuffer; - } - else if (read == -1) { - return EOF_BUFFER; - } - else { - return AbstractListenerReadPublisher.EMPTY_BUFFER; - } + ByteBuffer byteBuffer = this.factory.isDirect() ? + ByteBuffer.allocateDirect(this.bufferSize) : + ByteBuffer.allocate(this.bufferSize); + + int read = coyoteInputStream.read(byteBuffer); + logBytesRead(read); + if (read > 0) { + return this.factory.wrap(byteBuffer); } - finally { - if (release) { - DataBufferUtils.release(dataBuffer); - } + else if (read == -1) { + return EOF_BUFFER; + } + else { + return AbstractListenerReadPublisher.EMPTY_BUFFER; } } } @@ -233,7 +223,7 @@ public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter { @Override protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException { - ByteBuffer input = dataBuffer.asByteBuffer(); + ByteBuffer input = dataBuffer.toByteBuffer(); int len = input.remaining(); ServletResponse response = getNativeResponse(); ((CoyoteOutputStream) response.getOutputStream()).write(input); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index a3a450fe50..70a5b9c321 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -235,7 +235,7 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl @Override protected void dataReceived(DataBuffer dataBuffer) { super.dataReceived(dataBuffer); - this.byteBuffer = dataBuffer.asByteBuffer(); + this.byteBuffer = dataBuffer.toByteBuffer(); } @Override diff --git a/spring-web/src/test/java/org/springframework/http/codec/support/ClientCodecConfigurerTests.java b/spring-web/src/test/java/org/springframework/http/codec/support/ClientCodecConfigurerTests.java index 83fc0d7854..80986d780f 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/support/ClientCodecConfigurerTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/support/ClientCodecConfigurerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +37,8 @@ import org.springframework.core.codec.DataBufferDecoder; import org.springframework.core.codec.DataBufferEncoder; import org.springframework.core.codec.Decoder; import org.springframework.core.codec.Encoder; +import org.springframework.core.codec.Netty5BufferDecoder; +import org.springframework.core.codec.Netty5BufferEncoder; import org.springframework.core.codec.NettyByteBufDecoder; import org.springframework.core.codec.NettyByteBufEncoder; import org.springframework.core.codec.ResourceDecoder; @@ -86,11 +88,12 @@ public class ClientCodecConfigurerTests { @Test public void defaultReaders() { List> readers = this.configurer.getReaders(); - assertThat(readers.size()).isEqualTo(14); + assertThat(readers.size()).isEqualTo(15); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteArrayDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteBufferDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(DataBufferDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(NettyByteBufDecoder.class); + assertThat(getNextDecoder(readers).getClass()).isEqualTo(Netty5BufferDecoder.class); assertThat(readers.get(this.index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageReader.class); assertStringDecoder(getNextDecoder(readers), true); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ProtobufDecoder.class); @@ -107,11 +110,12 @@ public class ClientCodecConfigurerTests { @Test public void defaultWriters() { List> writers = this.configurer.getWriters(); - assertThat(writers.size()).isEqualTo(14); + assertThat(writers.size()).isEqualTo(15); assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteArrayEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteBufferEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(DataBufferEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(NettyByteBufEncoder.class); + assertThat(getNextEncoder(writers).getClass()).isEqualTo(Netty5BufferEncoder.class); assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageWriter.class); assertStringEncoder(getNextEncoder(writers), true); assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ProtobufHttpMessageWriter.class); @@ -172,11 +176,12 @@ public class ClientCodecConfigurerTests { int size = 99; this.configurer.defaultCodecs().maxInMemorySize(size); List> readers = this.configurer.getReaders(); - assertThat(readers.size()).isEqualTo(14); + assertThat(readers.size()).isEqualTo(15); assertThat(((ByteArrayDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((ByteBufferDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((DataBufferDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((NettyByteBufDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); + assertThat(((Netty5BufferDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((ResourceDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((StringDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((ProtobufDecoder) getNextDecoder(readers)).getMaxMessageSize()).isEqualTo(size); @@ -230,7 +235,7 @@ public class ClientCodecConfigurerTests { writers = findCodec(this.configurer.getWriters(), MultipartHttpMessageWriter.class).getPartWriters(); assertThat(sseDecoder).isNotSameAs(jackson2Decoder); - assertThat(writers).hasSize(12); + assertThat(writers).hasSize(13); } @Test // gh-24194 @@ -240,7 +245,7 @@ public class ClientCodecConfigurerTests { List> writers = findCodec(clone.getWriters(), MultipartHttpMessageWriter.class).getPartWriters(); - assertThat(writers).hasSize(12); + assertThat(writers).hasSize(13); } @Test @@ -254,7 +259,7 @@ public class ClientCodecConfigurerTests { List> writers = findCodec(clone.getWriters(), MultipartHttpMessageWriter.class).getPartWriters(); - assertThat(writers).hasSize(12); + assertThat(writers).hasSize(13); } private Decoder getNextDecoder(List> readers) { diff --git a/spring-web/src/test/java/org/springframework/http/codec/support/CodecConfigurerTests.java b/spring-web/src/test/java/org/springframework/http/codec/support/CodecConfigurerTests.java index 003646099f..98ab1b6576 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/support/CodecConfigurerTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/support/CodecConfigurerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,6 +34,8 @@ import org.springframework.core.codec.DataBufferDecoder; import org.springframework.core.codec.DataBufferEncoder; import org.springframework.core.codec.Decoder; import org.springframework.core.codec.Encoder; +import org.springframework.core.codec.Netty5BufferDecoder; +import org.springframework.core.codec.Netty5BufferEncoder; import org.springframework.core.codec.NettyByteBufDecoder; import org.springframework.core.codec.NettyByteBufEncoder; import org.springframework.core.codec.StringDecoder; @@ -81,11 +83,12 @@ class CodecConfigurerTests { @Test void defaultReaders() { List> readers = this.configurer.getReaders(); - assertThat(readers.size()).isEqualTo(13); + assertThat(readers.size()).isEqualTo(14); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteArrayDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteBufferDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(DataBufferDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(NettyByteBufDecoder.class); + assertThat(getNextDecoder(readers).getClass()).isEqualTo(Netty5BufferDecoder.class); assertThat(readers.get(this.index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageReader.class); assertStringDecoder(getNextDecoder(readers), true); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ProtobufDecoder.class); @@ -100,11 +103,12 @@ class CodecConfigurerTests { @Test void defaultWriters() { List> writers = this.configurer.getWriters(); - assertThat(writers.size()).isEqualTo(12); + assertThat(writers.size()).isEqualTo(13); assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteArrayEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteBufferEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(DataBufferEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(NettyByteBufEncoder.class); + assertThat(getNextEncoder(writers).getClass()).isEqualTo(Netty5BufferEncoder.class); assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageWriter.class); assertStringEncoder(getNextEncoder(writers), true); assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ProtobufHttpMessageWriter.class); @@ -137,13 +141,14 @@ class CodecConfigurerTests { List> readers = this.configurer.getReaders(); - assertThat(readers.size()).isEqualTo(17); + assertThat(readers.size()).isEqualTo(18); assertThat(getNextDecoder(readers)).isSameAs(customDecoder1); assertThat(readers.get(this.index.getAndIncrement())).isSameAs(customReader1); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteArrayDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteBufferDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(DataBufferDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(NettyByteBufDecoder.class); + assertThat(getNextDecoder(readers).getClass()).isEqualTo(Netty5BufferDecoder.class); assertThat(readers.get(this.index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageReader.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(StringDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ProtobufDecoder.class); @@ -179,13 +184,14 @@ class CodecConfigurerTests { List> writers = this.configurer.getWriters(); - assertThat(writers.size()).isEqualTo(16); + assertThat(writers.size()).isEqualTo(17); assertThat(getNextEncoder(writers)).isSameAs(customEncoder1); assertThat(writers.get(this.index.getAndIncrement())).isSameAs(customWriter1); assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteArrayEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteBufferEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(DataBufferEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(NettyByteBufEncoder.class); + assertThat(getNextEncoder(writers).getClass()).isEqualTo(Netty5BufferEncoder.class); assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageWriter.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(CharSequenceEncoder.class); assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ProtobufHttpMessageWriter.class); diff --git a/spring-web/src/test/java/org/springframework/http/codec/support/ServerCodecConfigurerTests.java b/spring-web/src/test/java/org/springframework/http/codec/support/ServerCodecConfigurerTests.java index 163795dcb7..f1f91a511b 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/support/ServerCodecConfigurerTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/support/ServerCodecConfigurerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,8 @@ import org.springframework.core.codec.DataBufferDecoder; import org.springframework.core.codec.DataBufferEncoder; import org.springframework.core.codec.Decoder; import org.springframework.core.codec.Encoder; +import org.springframework.core.codec.Netty5BufferDecoder; +import org.springframework.core.codec.Netty5BufferEncoder; import org.springframework.core.codec.NettyByteBufDecoder; import org.springframework.core.codec.NettyByteBufEncoder; import org.springframework.core.codec.ResourceDecoder; @@ -86,11 +88,12 @@ public class ServerCodecConfigurerTests { @Test public void defaultReaders() { List> readers = this.configurer.getReaders(); - assertThat(readers.size()).isEqualTo(16); + assertThat(readers.size()).isEqualTo(17); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteArrayDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteBufferDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(DataBufferDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(NettyByteBufDecoder.class); + assertThat(getNextDecoder(readers).getClass()).isEqualTo(Netty5BufferDecoder.class); assertThat(readers.get(this.index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageReader.class); assertStringDecoder(getNextDecoder(readers), true); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ProtobufDecoder.class); @@ -108,11 +111,12 @@ public class ServerCodecConfigurerTests { @Test public void defaultWriters() { List> writers = this.configurer.getWriters(); - assertThat(writers.size()).isEqualTo(14); + assertThat(writers.size()).isEqualTo(15); assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteArrayEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteBufferEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(DataBufferEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(NettyByteBufEncoder.class); + assertThat(getNextEncoder(writers).getClass()).isEqualTo(Netty5BufferEncoder.class); assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageWriter.class); assertStringEncoder(getNextEncoder(writers), true); assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ProtobufHttpMessageWriter.class); @@ -152,6 +156,7 @@ public class ServerCodecConfigurerTests { assertThat(((ByteBufferDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((DataBufferDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((NettyByteBufDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); + assertThat(((Netty5BufferDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((ResourceDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((StringDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((ProtobufDecoder) getNextDecoder(readers)).getMaxMessageSize()).isEqualTo(size); diff --git a/spring-webflux/spring-webflux.gradle b/spring-webflux/spring-webflux.gradle index 67773a507c..21b587f87b 100644 --- a/spring-webflux/spring-webflux.gradle +++ b/spring-webflux/spring-webflux.gradle @@ -54,6 +54,7 @@ dependencies { testRuntimeOnly("com.sun.xml.bind:jaxb-core") testRuntimeOnly("com.sun.xml.bind:jaxb-impl") testRuntimeOnly("com.sun.activation:jakarta.activation") + testRuntimeOnly("io.netty:netty5-buffer:5.0.0.Alpha4") } test { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/result/view/freemarker/FreeMarkerView.java b/spring-webflux/src/main/java/org/springframework/web/reactive/result/view/freemarker/FreeMarkerView.java index e07d06db69..f399b719e8 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/result/view/freemarker/FreeMarkerView.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/result/view/freemarker/FreeMarkerView.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,10 +42,10 @@ import org.springframework.context.ApplicationContextException; import org.springframework.context.i18n.LocaleContextHolder; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.MediaType; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import org.springframework.util.FastByteArrayOutputStream; import org.springframework.util.MimeType; import org.springframework.web.reactive.result.view.AbstractUrlBasedView; import org.springframework.web.reactive.result.view.RequestContext; @@ -252,24 +252,21 @@ public class FreeMarkerView extends AbstractUrlBasedView { } Locale locale = LocaleContextHolder.getLocale(exchange.getLocaleContext()); - DataBuffer dataBuffer = exchange.getResponse().bufferFactory().allocateBuffer(); + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); try { Charset charset = getCharset(contentType); - Writer writer = new OutputStreamWriter(dataBuffer.asOutputStream(), charset); + Writer writer = new OutputStreamWriter(bos, charset); getTemplate(locale).process(freeMarkerModel, writer); - return dataBuffer; + + byte[] bytes = bos.toByteArrayUnsafe(); + return exchange.getResponse().bufferFactory().wrap(bytes); } catch (IOException ex) { - DataBufferUtils.release(dataBuffer); String message = "Could not load FreeMarker template for URL [" + getUrl() + "]"; throw new IllegalStateException(message, ex); } - catch (Throwable ex) { - DataBufferUtils.release(dataBuffer); - throw ex; - } }) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release)); } private Charset getCharset(@Nullable MediaType mediaType) { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java index 5baa7971a0..ae303f2907 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -84,7 +84,7 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession dataBuffers = new ArrayList<>(byteBuffers.length); + for (ByteBuffer byteBuffer : byteBuffers) { + dataBuffers.add(this.session.bufferFactory().wrap(byteBuffer)); + } + DataBuffer joined = this.session.bufferFactory().join(dataBuffers); + return new WebSocketMessage(type, joined); } else { throw new IllegalArgumentException("Unexpected message type: " + message); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java index dfe12a6213..9945be92dd 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java @@ -76,7 +76,7 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession> readers = adapter.getMessageReaders(); - assertThat(readers.size()).isEqualTo(15); + assertThat(readers.size()).isEqualTo(16); ResolvableType multiValueMapType = forClassWithGenerics(MultiValueMap.class, String.class, String.class); @@ -199,7 +199,7 @@ public class WebFluxConfigurationSupportTests { assertThat(handler.getOrder()).isEqualTo(0); List> writers = handler.getMessageWriters(); - assertThat(writers.size()).isEqualTo(13); + assertThat(writers.size()).isEqualTo(14); assertHasMessageWriter(writers, forClass(byte[].class), APPLICATION_OCTET_STREAM); assertHasMessageWriter(writers, forClass(ByteBuffer.class), APPLICATION_OCTET_STREAM); @@ -227,7 +227,7 @@ public class WebFluxConfigurationSupportTests { assertThat(handler.getOrder()).isEqualTo(100); List> writers = handler.getMessageWriters(); - assertThat(writers.size()).isEqualTo(13); + assertThat(writers.size()).isEqualTo(14); assertHasMessageWriter(writers, forClass(byte[].class), APPLICATION_OCTET_STREAM); assertHasMessageWriter(writers, forClass(ByteBuffer.class), APPLICATION_OCTET_STREAM); diff --git a/src/docs/asciidoc/core/core-databuffer-codec.adoc b/src/docs/asciidoc/core/core-databuffer-codec.adoc index 7abf40a0cd..1f9f6e995a 100644 --- a/src/docs/asciidoc/core/core-databuffer-codec.adoc +++ b/src/docs/asciidoc/core/core-databuffer-codec.adoc @@ -125,7 +125,7 @@ release it immediately, it can do so via `DataBufferUtils.release(dataBuffer)`. . If a `Decoder` is using `Flux` or `Mono` operators such as `flatMap`, `reduce`, and others that prefetch and cache data items internally, or is using operators such as `filter`, `skip`, and others that leave out items, then -`doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)` must be added to the +`doOnDiscard(DataBuffer.class, DataBufferUtils::release)` must be added to the composition chain to ensure such buffers are released prior to being discarded, possibly also as a result of an error or cancellation signal. . If a `Decoder` holds on to one or more data buffers in any other way, it must