Adds support to lPop or rPop N elements from a Redis List in ReactiveListOperations.

Closes #2692
Original pull request: #2704
This commit is contained in:
John Blum
2023-09-07 16:58:33 -07:00
committed by Mark Paluch
parent ea4acad379
commit 157f5e4b1d
5 changed files with 165 additions and 10 deletions

View File

@@ -39,6 +39,7 @@ import org.springframework.util.Assert;
*
* @author Mark Paluch
* @author Christoph Strobl
* @author John Blum
* @since 2.0
*/
class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V> {
@@ -244,13 +245,21 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(timeout, "Duration must not be null");
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");
return createMono(listCommands ->
listCommands.blPop(Collections.singletonList(rawKey(key)), timeout)
.map(popResult -> readValue(popResult.getValue())));
}
@Override
public Flux<V> leftPop(K key, long count) {
Assert.notNull(key, "Key must not be null");
return createFlux(listCommands -> listCommands.lPop(rawKey(key), count).map(this::readValue));
}
@Override
public Mono<V> rightPop(K key) {
@@ -264,13 +273,21 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(timeout, "Duration must not be null");
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");
return createMono(listCommands ->
listCommands.brPop(Collections.singletonList(rawKey(key)), timeout)
.map(popResult -> readValue(popResult.getValue())));
}
@Override
public Flux<V> rightPop(K key, long count) {
Assert.notNull(key, "Key must not be null");
return createFlux(listCommands -> listCommands.rPop(rawKey(key), count).map(this::readValue));
}
@Override
public Mono<V> rightPopAndLeftPush(K sourceKey, K destinationKey) {
@@ -287,7 +304,7 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(sourceKey, "Source key must not be null");
Assert.notNull(destinationKey, "Destination key must not be null");
Assert.notNull(timeout, "Duration must not be null");
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");
return createMono(listCommands ->
listCommands.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout).map(this::readValue));
@@ -315,7 +332,7 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
return template.doCreateFlux(connection -> function.apply(connection.listCommands()));
}
private boolean isZeroOrGreater1Second(Duration timeout) {
private boolean isZeroOrGreaterOneSecond(Duration timeout) {
return timeout.isZero() || timeout.getNano() % TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS) == 0;
}

View File

@@ -32,6 +32,7 @@ import org.springframework.util.Assert;
*
* @author Mark Paluch
* @author Christoph Strobl
* @author John Blum
* @see <a href="https://redis.io/commands#list">Redis Documentation: List Commands</a>
* @since 2.0
*/
@@ -325,6 +326,16 @@ public interface ReactiveListOperations<K, V> {
*/
Mono<V> leftPop(K key, Duration timeout);
/**
* Removes {@link Long count} elements from the left-side of the Redis list stored at key.
*
* @param key {@link K Key} referring to the list stored in Redis; must not be {@literal null}.
* @param count {@link Long count} of the number of elements to remove from the left-side of the Redis list.
* @return a {@link Flux} containing the elements removed from the Redis list.
* @since 3.2
*/
Flux<V> leftPop(K key, long count);
/**
* Removes and returns last element in list stored at {@code key}.
*
@@ -347,6 +358,16 @@ public interface ReactiveListOperations<K, V> {
*/
Mono<V> rightPop(K key, Duration timeout);
/**
* Removes {@link Long count} elements from the right-side of the Redis list stored at key.
*
* @param key {@link K Key} referring to the list stored in Redis; must not be {@literal null}.
* @param count {@link Long count} of the number of elements to remove from the right-side of the Redis list.
* @return a {@link Flux} containing the elements removed from the Redis list.
* @since 3.2
*/
Flux<V> rightPop(K key, long count);
/**
* Remove the last element from list at {@code sourceKey}, append it to {@code destinationKey} and return its value.
*