From f5ad90a1ead8eff4fdc79157554abaf745fc86ed Mon Sep 17 00:00:00 2001 From: jinkshower Date: Sat, 7 Sep 2024 17:12:30 +0900 Subject: [PATCH] Fix `XAddOptions` maxlen handling and `XPendingOptions` validation. Closes #2982 Original pull request: #2985 --- .../connection/ReactiveStreamCommands.java | 22 ++++++---- .../redis/connection/RedisStreamCommands.java | 15 +++++-- .../connection/stream/StreamReadOptions.java | 2 +- .../ReactiveStreamCommandsUnitTests.java | 41 +++++++++++++++++++ .../RedisStreamCommandsUnitTests.java | 36 ++++++++++++++++ 5 files changed, 104 insertions(+), 12 deletions(-) create mode 100644 src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java create mode 100644 src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java index 460a8cc3f..bba360379 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java @@ -335,7 +335,7 @@ public interface ReactiveStreamCommands { * @since 2.3 */ public boolean hasMaxlen() { - return maxlen != null && maxlen > 0; + return maxlen != null; } /** @@ -654,7 +654,7 @@ public interface ReactiveStreamCommands { Assert.notNull(key, "Key must not be null"); Assert.notNull(groupName, "GroupName must not be null"); - return xPendingSummary(Mono.just(new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null))).next() + return xPendingSummary(Mono.just(PendingRecordsCommand.pending(key, groupName))).next() .map(CommandResponse::getOutput); } @@ -695,7 +695,7 @@ public interface ReactiveStreamCommands { */ @Nullable default Mono xPending(ByteBuffer key, String groupName, String consumerName) { - return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, Range.unbounded(), null))).next() + return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName))).next() .map(CommandResponse::getOutput); } @@ -712,7 +712,7 @@ public interface ReactiveStreamCommands { * @since 2.3 */ default Mono xPending(ByteBuffer key, String groupName, Range range, Long count) { - return xPending(Mono.just(new PendingRecordsCommand(key, groupName, null, range, count))).next() + return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count))).next() .map(CommandResponse::getOutput); } @@ -748,8 +748,8 @@ public interface ReactiveStreamCommands { */ default Mono xPending(ByteBuffer key, String groupName, String consumerName, Range range, Long count) { - return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, range, count))).next() - .map(CommandResponse::getOutput); + return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count))) + .next().map(CommandResponse::getOutput); } /** @@ -801,9 +801,15 @@ public interface ReactiveStreamCommands { /** * Create new {@link PendingRecordsCommand} with given {@link Range} and limit. * + * @param range must not be {@literal null}. + * @param count the max number of messages to return. Must not be negative. * @return new instance of {@link XPendingOptions}. */ - public PendingRecordsCommand range(Range range, Long count) { + public PendingRecordsCommand range(Range range, Long count) { + + Assert.notNull(range, "Range must not be null"); + Assert.isTrue(count > -1, "Count must not be negative"); + return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count); } @@ -855,7 +861,7 @@ public interface ReactiveStreamCommands { * @return {@literal true} count is set. */ public boolean isLimited() { - return count != null && count > -1; + return count != null; } } diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java index df9cbd9f5..df097158b 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java @@ -214,7 +214,7 @@ public interface RedisStreamCommands { * @return {@literal true} if {@literal MAXLEN} is set. */ public boolean hasMaxlen() { - return maxlen != null && maxlen > 0; + return maxlen != null; } /** @@ -788,19 +788,28 @@ public interface RedisStreamCommands { /** * Create new {@link XPendingOptions} with an unbounded {@link Range} ({@literal - +}). * - * @param count the max number of messages to return. Must not be {@literal null}. + * @param count the max number of messages to return. Must not be negative. * @return new instance of {@link XPendingOptions}. */ public static XPendingOptions unbounded(Long count) { + + Assert.isTrue(count > -1, "Count must not be negative"); + return new XPendingOptions(null, Range.unbounded(), count); } /** * Create new {@link XPendingOptions} with given {@link Range} and limit. * + * @param range must not be {@literal null}. + * @param count the max number of messages to return. Must not be negative. * @return new instance of {@link XPendingOptions}. */ public static XPendingOptions range(Range range, Long count) { + + Assert.notNull(range, "Range must not be null"); + Assert.isTrue(count > -1, "Count must not be negative"); + return new XPendingOptions(null, range, count); } @@ -848,7 +857,7 @@ public interface RedisStreamCommands { * @return {@literal true} count is set. */ public boolean isLimited() { - return count != null && count > -1; + return count != null; } } diff --git a/src/main/java/org/springframework/data/redis/connection/stream/StreamReadOptions.java b/src/main/java/org/springframework/data/redis/connection/stream/StreamReadOptions.java index 45c456c0b..58fd0652e 100644 --- a/src/main/java/org/springframework/data/redis/connection/stream/StreamReadOptions.java +++ b/src/main/java/org/springframework/data/redis/connection/stream/StreamReadOptions.java @@ -96,7 +96,7 @@ public class StreamReadOptions { */ public StreamReadOptions count(long count) { - Assert.isTrue(count > 0, "Count must be greater or equal to zero"); + Assert.isTrue(count > 0, "Count must be greater than zero"); return new StreamReadOptions(block, count, noack); } diff --git a/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java b/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java new file mode 100644 index 000000000..bef26ac86 --- /dev/null +++ b/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java @@ -0,0 +1,41 @@ +package org.springframework.data.redis.connection; + +import static org.assertj.core.api.Assertions.*; + +import java.nio.ByteBuffer; + +import org.junit.jupiter.api.Test; + +import org.springframework.data.domain.Range; +import org.springframework.data.redis.connection.ReactiveStreamCommands.PendingRecordsCommand; + +/** + * Unit tests for {@link ReactiveStreamCommands}. + * + * @author jinkshower + */ +class ReactiveStreamCommandsUnitTests { + + @Test // GH-2982 + void pendingRecordsCommandRangeShouldThrowExceptionWhenRangeIsNull() { + + ByteBuffer key = ByteBuffer.wrap("my-stream".getBytes()); + String groupName = "my-group"; + + PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName); + + assertThatThrownBy(() -> command.range(null, 10L)).isInstanceOf(IllegalArgumentException.class); + } + + @Test // GH-2982 + void pendingRecordsCommandRangeShouldThrowExceptionWhenCountIsNegative() { + + ByteBuffer key = ByteBuffer.wrap("my-stream".getBytes()); + String groupName = "my-group"; + + PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName); + Range range = Range.closed("0", "10"); + + assertThatThrownBy(() -> command.range(range, -1L)).isInstanceOf(IllegalArgumentException.class); + } +} diff --git a/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java b/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java new file mode 100644 index 000000000..635f6cbe1 --- /dev/null +++ b/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java @@ -0,0 +1,36 @@ +package org.springframework.data.redis.connection; + +import static org.assertj.core.api.Assertions.*; + +import org.junit.jupiter.api.Test; + +import org.springframework.data.domain.Range; +import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions; + +/** + * Unit tests for {@link RedisStreamCommands}. + * + * @author jinkshower + */ +class RedisStreamCommandsUnitTests { + + @Test // GH-2982 + void xPendingOptionsUnboundedShouldThrowExceptionWhenCountIsNegative() { + + assertThatThrownBy(() -> XPendingOptions.unbounded(-1L)).isInstanceOf(IllegalArgumentException.class); + } + + @Test // GH-2982 + void xPendingOptionsRangeShouldThrowExceptionWhenRangeIsNull() { + + assertThatThrownBy(() -> XPendingOptions.range(null, 10L)).isInstanceOf(IllegalArgumentException.class); + } + + @Test // GH-2982 + void xPendingOptionsRangeShouldThrowExceptionWhenCountIsNegative() { + + Range range = Range.closed("0", "10"); + + assertThatThrownBy(() -> XPendingOptions.range(range, -1L)).isInstanceOf(IllegalArgumentException.class); + } +}