Fix XAddOptions maxlen handling and XPendingOptions validation.
Closes #2982 Original pull request: #2985
This commit is contained in:
@@ -335,7 +335,7 @@ public interface ReactiveStreamCommands {
|
||||
* @since 2.3
|
||||
*/
|
||||
public boolean hasMaxlen() {
|
||||
return maxlen != null && maxlen > 0;
|
||||
return maxlen != null;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -654,7 +654,7 @@ public interface ReactiveStreamCommands {
|
||||
Assert.notNull(key, "Key must not be null");
|
||||
Assert.notNull(groupName, "GroupName must not be null");
|
||||
|
||||
return xPendingSummary(Mono.just(new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null))).next()
|
||||
return xPendingSummary(Mono.just(PendingRecordsCommand.pending(key, groupName))).next()
|
||||
.map(CommandResponse::getOutput);
|
||||
}
|
||||
|
||||
@@ -695,7 +695,7 @@ public interface ReactiveStreamCommands {
|
||||
*/
|
||||
@Nullable
|
||||
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName) {
|
||||
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, Range.unbounded(), null))).next()
|
||||
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName))).next()
|
||||
.map(CommandResponse::getOutput);
|
||||
}
|
||||
|
||||
@@ -712,7 +712,7 @@ public interface ReactiveStreamCommands {
|
||||
* @since 2.3
|
||||
*/
|
||||
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?> range, Long count) {
|
||||
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, null, range, count))).next()
|
||||
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count))).next()
|
||||
.map(CommandResponse::getOutput);
|
||||
}
|
||||
|
||||
@@ -748,8 +748,8 @@ public interface ReactiveStreamCommands {
|
||||
*/
|
||||
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName, Range<?> range,
|
||||
Long count) {
|
||||
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, range, count))).next()
|
||||
.map(CommandResponse::getOutput);
|
||||
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count)))
|
||||
.next().map(CommandResponse::getOutput);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -801,9 +801,15 @@ public interface ReactiveStreamCommands {
|
||||
/**
|
||||
* Create new {@link PendingRecordsCommand} with given {@link Range} and limit.
|
||||
*
|
||||
* @param range must not be {@literal null}.
|
||||
* @param count the max number of messages to return. Must not be negative.
|
||||
* @return new instance of {@link XPendingOptions}.
|
||||
*/
|
||||
public PendingRecordsCommand range(Range<String> range, Long count) {
|
||||
public PendingRecordsCommand range(Range<?> range, Long count) {
|
||||
|
||||
Assert.notNull(range, "Range must not be null");
|
||||
Assert.isTrue(count > -1, "Count must not be negative");
|
||||
|
||||
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
|
||||
}
|
||||
|
||||
@@ -855,7 +861,7 @@ public interface ReactiveStreamCommands {
|
||||
* @return {@literal true} count is set.
|
||||
*/
|
||||
public boolean isLimited() {
|
||||
return count != null && count > -1;
|
||||
return count != null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -214,7 +214,7 @@ public interface RedisStreamCommands {
|
||||
* @return {@literal true} if {@literal MAXLEN} is set.
|
||||
*/
|
||||
public boolean hasMaxlen() {
|
||||
return maxlen != null && maxlen > 0;
|
||||
return maxlen != null;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -788,19 +788,28 @@ public interface RedisStreamCommands {
|
||||
/**
|
||||
* Create new {@link XPendingOptions} with an unbounded {@link Range} ({@literal - +}).
|
||||
*
|
||||
* @param count the max number of messages to return. Must not be {@literal null}.
|
||||
* @param count the max number of messages to return. Must not be negative.
|
||||
* @return new instance of {@link XPendingOptions}.
|
||||
*/
|
||||
public static XPendingOptions unbounded(Long count) {
|
||||
|
||||
Assert.isTrue(count > -1, "Count must not be negative");
|
||||
|
||||
return new XPendingOptions(null, Range.unbounded(), count);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create new {@link XPendingOptions} with given {@link Range} and limit.
|
||||
*
|
||||
* @param range must not be {@literal null}.
|
||||
* @param count the max number of messages to return. Must not be negative.
|
||||
* @return new instance of {@link XPendingOptions}.
|
||||
*/
|
||||
public static XPendingOptions range(Range<?> range, Long count) {
|
||||
|
||||
Assert.notNull(range, "Range must not be null");
|
||||
Assert.isTrue(count > -1, "Count must not be negative");
|
||||
|
||||
return new XPendingOptions(null, range, count);
|
||||
}
|
||||
|
||||
@@ -848,7 +857,7 @@ public interface RedisStreamCommands {
|
||||
* @return {@literal true} count is set.
|
||||
*/
|
||||
public boolean isLimited() {
|
||||
return count != null && count > -1;
|
||||
return count != null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -96,7 +96,7 @@ public class StreamReadOptions {
|
||||
*/
|
||||
public StreamReadOptions count(long count) {
|
||||
|
||||
Assert.isTrue(count > 0, "Count must be greater or equal to zero");
|
||||
Assert.isTrue(count > 0, "Count must be greater than zero");
|
||||
|
||||
return new StreamReadOptions(block, count, noack);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
package org.springframework.data.redis.connection;
|
||||
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.data.domain.Range;
|
||||
import org.springframework.data.redis.connection.ReactiveStreamCommands.PendingRecordsCommand;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link ReactiveStreamCommands}.
|
||||
*
|
||||
* @author jinkshower
|
||||
*/
|
||||
class ReactiveStreamCommandsUnitTests {
|
||||
|
||||
@Test // GH-2982
|
||||
void pendingRecordsCommandRangeShouldThrowExceptionWhenRangeIsNull() {
|
||||
|
||||
ByteBuffer key = ByteBuffer.wrap("my-stream".getBytes());
|
||||
String groupName = "my-group";
|
||||
|
||||
PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName);
|
||||
|
||||
assertThatThrownBy(() -> command.range(null, 10L)).isInstanceOf(IllegalArgumentException.class);
|
||||
}
|
||||
|
||||
@Test // GH-2982
|
||||
void pendingRecordsCommandRangeShouldThrowExceptionWhenCountIsNegative() {
|
||||
|
||||
ByteBuffer key = ByteBuffer.wrap("my-stream".getBytes());
|
||||
String groupName = "my-group";
|
||||
|
||||
PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName);
|
||||
Range<?> range = Range.closed("0", "10");
|
||||
|
||||
assertThatThrownBy(() -> command.range(range, -1L)).isInstanceOf(IllegalArgumentException.class);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
package org.springframework.data.redis.connection;
|
||||
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.data.domain.Range;
|
||||
import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link RedisStreamCommands}.
|
||||
*
|
||||
* @author jinkshower
|
||||
*/
|
||||
class RedisStreamCommandsUnitTests {
|
||||
|
||||
@Test // GH-2982
|
||||
void xPendingOptionsUnboundedShouldThrowExceptionWhenCountIsNegative() {
|
||||
|
||||
assertThatThrownBy(() -> XPendingOptions.unbounded(-1L)).isInstanceOf(IllegalArgumentException.class);
|
||||
}
|
||||
|
||||
@Test // GH-2982
|
||||
void xPendingOptionsRangeShouldThrowExceptionWhenRangeIsNull() {
|
||||
|
||||
assertThatThrownBy(() -> XPendingOptions.range(null, 10L)).isInstanceOf(IllegalArgumentException.class);
|
||||
}
|
||||
|
||||
@Test // GH-2982
|
||||
void xPendingOptionsRangeShouldThrowExceptionWhenCountIsNegative() {
|
||||
|
||||
Range<?> range = Range.closed("0", "10");
|
||||
|
||||
assertThatThrownBy(() -> XPendingOptions.range(range, -1L)).isInstanceOf(IllegalArgumentException.class);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user