Correctly apply Reactive zadd NX/XX command flags.

We now correctly apply if exists/if not exists constraints on the reactive zadd command.

Previously, we only considered upsert which wasn't sufficient to apply xx/nx.

Closes #2731
This commit is contained in:
Mark Paluch
2023-10-12 10:03:02 +02:00
parent 9bd8fda30e
commit df4be6f8b5
3 changed files with 245 additions and 78 deletions

View File

@@ -25,14 +25,17 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.data.domain.Range;
import org.springframework.data.domain.Sort.Direction;
import org.springframework.data.redis.connection.RedisZSetCommands.ZAddArgs.Flag;
import org.springframework.data.redis.connection.zset.Aggregate;
import org.springframework.data.redis.connection.zset.DefaultTuple;
import org.springframework.data.redis.connection.zset.Tuple;
@@ -61,23 +64,16 @@ public interface ReactiveZSetCommands {
class ZAddCommand extends KeyCommand {
private final List<Tuple> tuples;
private final boolean upsert;
private final boolean returnTotalChanged;
private final Set<Flag> flags;
private final boolean incr;
private final boolean gt;
private final boolean lt;
private ZAddCommand(@Nullable ByteBuffer key, List<Tuple> tuples, boolean upsert, boolean returnTotalChanged,
boolean incr, boolean gt, boolean lt) {
private ZAddCommand(@Nullable ByteBuffer key, List<Tuple> tuples, Set<Flag> flags, boolean incr) {
super(key);
this.tuples = tuples;
this.upsert = upsert;
this.returnTotalChanged = returnTotalChanged;
this.flags = flags;
this.incr = incr;
this.gt = gt;
this.lt = lt;
}
/**
@@ -103,7 +99,7 @@ public interface ReactiveZSetCommands {
Assert.notNull(tuples, "Tuples must not be null");
return new ZAddCommand(null, new ArrayList<>(tuples), false, false, false, false, false);
return new ZAddCommand(null, new ArrayList<>(tuples), EnumSet.noneOf(Flag.class), false);
}
/**
@@ -116,7 +112,7 @@ public interface ReactiveZSetCommands {
Assert.notNull(key, "Key must not be null");
return new ZAddCommand(key, tuples, upsert, returnTotalChanged, incr, gt, lt);
return new ZAddCommand(key, tuples, flags, incr);
}
/**
@@ -126,7 +122,11 @@ public interface ReactiveZSetCommands {
* @return a new {@link ZAddCommand} with {@literal xx} applied.
*/
public ZAddCommand xx() {
return new ZAddCommand(getKey(), tuples, false, returnTotalChanged, incr, gt, lt);
EnumSet<Flag> flags = EnumSet.copyOf(this.flags);
flags.remove(Flag.NX);
flags.add(Flag.XX);
return new ZAddCommand(getKey(), tuples, flags, incr);
}
/**
@@ -136,7 +136,11 @@ public interface ReactiveZSetCommands {
* @return a new {@link ZAddCommand} with {@literal nx} applied.
*/
public ZAddCommand nx() {
return new ZAddCommand(getKey(), tuples, true, returnTotalChanged, incr, gt, lt);
EnumSet<Flag> flags = EnumSet.copyOf(this.flags);
flags.remove(Flag.XX);
flags.add(Flag.NX);
return new ZAddCommand(getKey(), tuples, flags, incr);
}
/**
@@ -146,17 +150,20 @@ public interface ReactiveZSetCommands {
* @return a new {@link ZAddCommand} with {@literal ch} applied.
*/
public ZAddCommand ch() {
return new ZAddCommand(getKey(), tuples, upsert, true, incr, gt, lt);
EnumSet<Flag> flags = EnumSet.copyOf(this.flags);
flags.add(Flag.CH);
return new ZAddCommand(getKey(), tuples, flags, incr);
}
/**
* Applies {@literal incr} mode (When this option is specified ZADD acts like ZINCRBY). Constructs a new command
* instance with all previously configured properties.
* instance with all previously configured properties. Note that the command result returns the score of the member.
*
* @return a new {@link ZAddCommand} with {@literal incr} applied.
*/
public ZAddCommand incr() {
return new ZAddCommand(getKey(), tuples, upsert, upsert, true, gt, lt);
return new ZAddCommand(getKey(), tuples, flags, true);
}
/**
@@ -166,7 +173,11 @@ public interface ReactiveZSetCommands {
* @since 2.5
*/
public ZAddCommand gt() {
return new ZAddCommand(getKey(), tuples, upsert, upsert, incr, true, lt);
EnumSet<Flag> flags = EnumSet.copyOf(this.flags);
flags.remove(Flag.LT);
flags.add(Flag.GT);
return new ZAddCommand(getKey(), tuples, flags, incr);
}
/**
@@ -176,7 +187,11 @@ public interface ReactiveZSetCommands {
* @since 2.5
*/
public ZAddCommand lt() {
return new ZAddCommand(getKey(), tuples, upsert, upsert, incr, gt, true);
EnumSet<Flag> flags = EnumSet.copyOf(this.flags);
flags.remove(Flag.GT);
flags.add(Flag.LT);
return new ZAddCommand(getKey(), tuples, flags, incr);
}
/**
@@ -187,10 +202,26 @@ public interface ReactiveZSetCommands {
}
/**
* @return
* @return {@code true} if the command does not contain NX or XX flags.
*/
public boolean isUpsert() {
return upsert;
return !flags.contains(Flag.NX) && !flags.contains(Flag.XX);
}
/**
* @return {@code true} if the command contains the XX flag.
* @since 3.1.5
*/
public boolean isIfExists() {
return flags.contains(Flag.XX);
}
/**
* @return {@code true} if the command contains the NX flag.
* @since 3.1.5
*/
public boolean isIfNotExists() {
return flags.contains(Flag.NX);
}
/**
@@ -205,7 +236,7 @@ public interface ReactiveZSetCommands {
* @since 2.5
*/
public boolean isGt() {
return gt;
return flags.contains(Flag.GT);
}
/**
@@ -213,14 +244,14 @@ public interface ReactiveZSetCommands {
* @since 2.5
*/
public boolean isLt() {
return lt;
return flags.contains(Flag.LT);
}
/**
* @return
*/
public boolean isReturnTotalChanged() {
return returnTotalChanged;
return flags.contains(Flag.CH);
}
}

View File

@@ -23,7 +23,6 @@ import io.lettuce.core.ScoredValue;
import io.lettuce.core.Value;
import io.lettuce.core.ZAddArgs;
import io.lettuce.core.ZStoreArgs;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -32,7 +31,6 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.springframework.data.domain.Sort.Direction;
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
@@ -86,36 +84,32 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
ZAddArgs args = null;
if (command.isIncr() || command.isUpsert() || command.isReturnTotalChanged()) {
if (command.isIncr()) {
if (command.isIncr()) {
if (command.getTuples().size() > 1) {
throw new IllegalArgumentException("ZADD INCR must not contain more than one tuple");
}
Tuple tuple = command.getTuples().iterator().next();
return reactiveCommands.zaddincr(command.getKey(), tuple.getScore(), ByteBuffer.wrap(tuple.getValue()))
.map(value -> new NumericResponse<>(command, value));
if (command.getTuples().size() > 1) {
throw new IllegalArgumentException("ZADD INCR must not contain more than one tuple");
}
if (command.isReturnTotalChanged()) {
args = ZAddArgs.Builder.ch();
}
Tuple tuple = command.getTuples().iterator().next();
if (command.isUpsert()) {
args = args == null ? ZAddArgs.Builder.nx() : args.nx();
} else {
args = args == null ? ZAddArgs.Builder.xx() : args.xx();
}
return reactiveCommands.zaddincr(command.getKey(), tuple.getScore(), ByteBuffer.wrap(tuple.getValue()))
.map(value -> new NumericResponse<>(command, value));
}
if (command.isGt()) {
args = args == null ? ZAddArgs.Builder.gt() : args.gt();
}
if (command.isLt()) {
args = args == null ? ZAddArgs.Builder.lt() : args.lt();
}
if (command.isReturnTotalChanged()) {
args = ZAddArgs.Builder.ch();
}
if (command.isIfNotExists()) {
args = args == null ? ZAddArgs.Builder.nx() : args.nx();
} else if (command.isIfExists()) {
args = args == null ? ZAddArgs.Builder.xx() : args.xx();
}
if (command.isGt()) {
args = args == null ? ZAddArgs.Builder.gt() : args.gt();
} else if (command.isLt()) {
args = args == null ? ZAddArgs.Builder.lt() : args.lt();
}
ScoredValue<ByteBuffer>[] values = (ScoredValue<ByteBuffer>[]) command.getTuples().stream()
@@ -139,8 +133,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
ByteBuffer[] values = command.getValues().toArray(ByteBuffer[]::new);
return reactiveCommands.zrem(command.getKey(), values)
.map(value -> new NumericResponse<>(command, value));
return reactiveCommands.zrem(command.getKey(), values).map(value -> new NumericResponse<>(command, value));
}));
}
@@ -178,8 +171,8 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
Assert.notNull(command.getKey(), "Key must not be null");
Flux<ScoredValue<ByteBuffer>> result =
reactiveCommands.zrandmemberWithScores(command.getKey(), command.getCount());
Flux<ScoredValue<ByteBuffer>> result = reactiveCommands.zrandmemberWithScores(command.getKey(),
command.getCount());
return new CommandResponse<>(command, result.map(this::toTuple));
}));
@@ -299,7 +292,8 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
result = reactiveCommands.zrangebyscore(command.getKey(), range).map(value -> toTuple(value, Double.NaN));
} else {
result = reactiveCommands.zrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get()))
result = reactiveCommands
.zrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get()))
.map(value -> toTuple(value, Double.NaN));
}
}
@@ -319,7 +313,8 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
result = reactiveCommands.zrevrangebyscore(command.getKey(), range)
.map(value -> toTuple(value, Double.NaN));
} else {
result = reactiveCommands.zrevrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get()))
result = reactiveCommands
.zrevrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get()))
.map(value -> toTuple(value, Double.NaN));
}
}
@@ -337,7 +332,8 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getOptions(), "ScanOptions must not be null");
Flux<Tuple> result = ScanStream.zscan(reactiveCommands, command.getKey(), LettuceConverters.toScanArgs(command.getOptions()))
Flux<Tuple> result = ScanStream
.zscan(reactiveCommands, command.getKey(), LettuceConverters.toScanArgs(command.getOptions()))
.map(this::toTuple);
return Mono.just(new CommandResponse<>(command, result));
@@ -387,8 +383,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
? reactiveCommands.zpopmin(command.getKey(), command.getCount())
: reactiveCommands.zpopmax(command.getKey(), command.getCount());
} else {
result = (command.getDirection() == PopDirection.MIN
? reactiveCommands.zpopmin(command.getKey())
result = (command.getDirection() == PopDirection.MIN ? reactiveCommands.zpopmin(command.getKey())
: reactiveCommands.zpopmax(command.getKey())).flux();
}
@@ -415,14 +410,13 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
Mono<ScoredValue<ByteBuffer>> result = commandResult.filter(Value::hasValue).map(Value::getValue);
return new CommandResponse<>(command, result.filter(Value::hasValue).map(this::toTuple).flux());
}
else {
} else {
long timeout = command.getTimeUnit().toSeconds(command.getTimeout());
Mono<KeyValue<ByteBuffer, ScoredValue<ByteBuffer>>> commandResult = command.getDirection() == PopDirection.MIN
? reactiveCommands.bzpopmin(timeout, command.getKey())
: reactiveCommands.bzpopmax(timeout, command.getKey());
? reactiveCommands.bzpopmin(timeout, command.getKey())
: reactiveCommands.bzpopmax(timeout, command.getKey());
Mono<ScoredValue<ByteBuffer>> result = commandResult.filter(Value::hasValue).map(Value::getValue);
@@ -438,8 +432,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
Assert.notNull(command.getKey(), "Key must not be null");
return reactiveCommands.zcard(command.getKey())
.map(value -> new NumericResponse<>(command, value));
return reactiveCommands.zcard(command.getKey()).map(value -> new NumericResponse<>(command, value));
}));
}
@@ -574,8 +567,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(new ByteBuffer[0]);
Flux<ByteBuffer> result = args != null
? reactiveCommands.zinter(args, sourceKeys)
Flux<ByteBuffer> result = args != null ? reactiveCommands.zinter(args, sourceKeys)
: reactiveCommands.zinter(sourceKeys);
return new CommandResponse<>(command, result);
@@ -599,8 +591,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(new ByteBuffer[0]);
Flux<ScoredValue<ByteBuffer>> result = args != null
? reactiveCommands.zinterWithScores(args, sourceKeys)
Flux<ScoredValue<ByteBuffer>> result = args != null ? reactiveCommands.zinterWithScores(args, sourceKeys)
: reactiveCommands.zinterWithScores(sourceKeys);
return new CommandResponse<>(command, result.map(this::toTuple));
@@ -625,8 +616,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(new ByteBuffer[0]);
Mono<Long> result = args != null
? reactiveCommands.zinterstore(command.getKey(), args, sourceKeys)
Mono<Long> result = args != null ? reactiveCommands.zinterstore(command.getKey(), args, sourceKeys)
: reactiveCommands.zinterstore(command.getKey(), sourceKeys);
return result.map(value -> new NumericResponse<>(command, value));
@@ -650,8 +640,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(ByteBuffer[]::new);
Flux<ByteBuffer> result = args != null
? reactiveCommands.zunion(args, sourceKeys)
Flux<ByteBuffer> result = args != null ? reactiveCommands.zunion(args, sourceKeys)
: reactiveCommands.zunion(sourceKeys);
return new CommandResponse<>(command, result);
@@ -675,8 +664,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(ByteBuffer[]::new);
Flux<ScoredValue<ByteBuffer>> result = args != null
? reactiveCommands.zunionWithScores(args, sourceKeys)
Flux<ScoredValue<ByteBuffer>> result = args != null ? reactiveCommands.zunionWithScores(args, sourceKeys)
: reactiveCommands.zunionWithScores(sourceKeys);
return new CommandResponse<>(command, result.map(this::toTuple));
@@ -701,8 +689,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(ByteBuffer[]::new);
Mono<Long> result = args != null
? reactiveCommands.zunionstore(command.getKey(), args, sourceKeys)
Mono<Long> result = args != null ? reactiveCommands.zunionstore(command.getKey(), args, sourceKeys)
: reactiveCommands.zunionstore(command.getKey(), sourceKeys);
return result.map(value -> new NumericResponse<>(command, value));