Terminate stream with error on null values returned by RedisElementReader for top-level elements.

We now emit InvalidDataAccessApiUsageException when a RedisElementReader returns null in the context of a top-level stream to indicate invalid API usage although RedisElementReader.read can generally return null values if these are being collected in a container or value wrapper or parent complex object.

Apply consistent wording to operations documentation.
This commit is contained in:
Mark Paluch
2023-08-14 11:26:59 +02:00
committed by John Blum
parent 66b00e286d
commit b5f124cfd8
22 changed files with 276 additions and 99 deletions

View File

@@ -469,7 +469,7 @@ public abstract class Converters {
* @return
* @since 2.6
*/
public static <K, V> Map.Entry<K, V> entryOf(K key, V value) {
public static <K, V> Map.Entry<K, V> entryOf(@Nullable K key, @Nullable V value) {
return new AbstractMap.SimpleImmutableEntry<>(key, value);
}

View File

@@ -39,6 +39,7 @@ import org.springframework.data.redis.domain.geo.GeoReference;
import org.springframework.data.redis.domain.geo.GeoReference.GeoMemberReference;
import org.springframework.data.redis.domain.geo.GeoShape;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
@@ -320,6 +321,7 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
return serializationContext.getValueSerializationPair().write(value);
}
@Nullable
private V readValue(ByteBuffer buffer) {
return serializationContext.getValueSerializationPair().read(buffer);
}

View File

@@ -26,9 +26,11 @@ import java.util.Map;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.connection.ReactiveHashCommands;
import org.springframework.data.redis.connection.convert.Converters;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
@@ -126,7 +128,8 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
Assert.notNull(key, "Key must not be null");
return createMono(hashCommands -> hashCommands.hRandField(rawKey(key))).map(this::readHashKey);
return template.doCreateMono(connection -> connection //
.hashCommands().hRandField(rawKey(key))).map(this::readRequiredHashKey);
}
@Override
@@ -142,7 +145,8 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
Assert.notNull(key, "Key must not be null");
return createFlux(hashCommands -> hashCommands.hRandField(rawKey(key), count)).map(this::readHashKey);
return template.doCreateFlux(connection -> connection //
.hashCommands().hRandField(rawKey(key), count)).map(this::readRequiredHashKey);
}
@Override
@@ -159,8 +163,8 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
Assert.notNull(key, "Key must not be null");
return createFlux(hashCommands -> hashCommands.hKeys(rawKey(key)) //
.map(this::readHashKey));
return createFlux(connection -> connection.hKeys(rawKey(key)) //
.map(this::readRequiredHashKey));
}
@Override
@@ -207,8 +211,8 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
Assert.notNull(key, "Key must not be null");
return createFlux(hashCommands -> hashCommands.hVals(rawKey(key)) //
.map(this::readHashValue));
return createFlux(connection -> connection.hVals(rawKey(key)) //
.map(this::readRequiredHashValue));
}
@Override
@@ -265,13 +269,37 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
}
@SuppressWarnings("unchecked")
@Nullable
private HK readHashKey(ByteBuffer value) {
return (HK) serializationContext.getHashKeySerializationPair().read(value);
}
private HK readRequiredHashKey(ByteBuffer buffer) {
HK hashKey = readHashKey(buffer);
if (hashKey == null) {
throw new InvalidDataAccessApiUsageException("Deserialized hash key is null");
}
return hashKey;
}
@SuppressWarnings("unchecked")
private HV readHashValue(ByteBuffer value) {
return (HV) (value == null ? value : serializationContext.getHashValueSerializationPair().read(value));
@Nullable
private HV readHashValue(@Nullable ByteBuffer value) {
return (HV) (value == null ? null : serializationContext.getHashValueSerializationPair().read(value));
}
private HV readRequiredHashValue(ByteBuffer buffer) {
HV hashValue = readHashValue(buffer);
if (hashValue == null) {
throw new InvalidDataAccessApiUsageException("Deserialized hash value is null");
}
return hashValue;
}
private Map.Entry<HK, HV> deserializeHashEntry(Map.Entry<ByteBuffer, ByteBuffer> source) {

View File

@@ -27,11 +27,13 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.connection.ReactiveListCommands;
import org.springframework.data.redis.connection.ReactiveListCommands.Direction;
import org.springframework.data.redis.connection.ReactiveListCommands.LPosCommand;
import org.springframework.data.redis.connection.RedisListCommands.Position;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
@@ -59,7 +61,7 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createFlux(listCommands -> listCommands.lRange(rawKey(key), start, end).map(this::readValue));
return createFlux(connection -> connection.lRange(rawKey(key), start, end).map(this::readRequiredValue));
}
@Override
@@ -172,8 +174,8 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(from, "From direction must not be null");
Assert.notNull(to, "To direction must not be null");
return createMono(listCommands ->
listCommands.lMove(rawKey(sourceKey), rawKey(destinationKey), from, to).map(this::readValue));
return createMono(connection -> connection.lMove(rawKey(sourceKey), rawKey(destinationKey), from, to)
.map(this::readRequiredValue));
}
@Override
@@ -185,8 +187,8 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(to, "To direction must not be null");
Assert.notNull(timeout, "Timeout must not be null");
return createMono(listCommands ->
listCommands.bLMove(rawKey(sourceKey), rawKey(destinationKey), from, to, timeout).map(this::readValue));
return createMono(connection -> connection.bLMove(rawKey(sourceKey), rawKey(destinationKey), from, to, timeout)
.map(this::readRequiredValue));
}
@Override
@@ -211,7 +213,7 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(listCommands -> listCommands.lIndex(rawKey(key), index).map(this::readValue));
return createMono(connection -> connection.lIndex(rawKey(key), index).map(this::readRequiredValue));
}
@Override
@@ -236,7 +238,7 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(listCommands -> listCommands.lPop(rawKey(key)).map(this::readValue));
return createMono(connection -> connection.lPop(rawKey(key)).map(this::readRequiredValue));
}
@@ -245,7 +247,7 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createFlux(listCommands -> listCommands.lPop(rawKey(key), count).map(this::readValue));
return createFlux(listCommands -> listCommands.lPop(rawKey(key), count).map(this::readRequiredValue));
}
@Override
@@ -255,9 +257,8 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(timeout, "Duration must not be null");
Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");
return createMono(listCommands ->
listCommands.blPop(Collections.singletonList(rawKey(key)), timeout)
.map(popResult -> readValue(popResult.getValue())));
return createMono(connection -> connection.blPop(Collections.singletonList(rawKey(key)), timeout)
.mapNotNull(popResult -> readValue(popResult.getValue())));
}
@Override
@@ -265,7 +266,7 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(listCommands -> listCommands.rPop(rawKey(key)).map(this::readValue));
return createMono(listCommands -> listCommands.rPop(rawKey(key)).map(this::readRequiredValue));
}
@Override
@@ -273,7 +274,7 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createFlux(listCommands -> listCommands.rPop(rawKey(key), count).map(this::readValue));
return createFlux(listCommands -> listCommands.rPop(rawKey(key), count).map(this::readRequiredValue));
}
@Override
@@ -283,9 +284,8 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(timeout, "Duration must not be null");
Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");
return createMono(listCommands ->
listCommands.brPop(Collections.singletonList(rawKey(key)), timeout)
.map(popResult -> readValue(popResult.getValue())));
return createMono(connection -> connection.brPop(Collections.singletonList(rawKey(key)), timeout)
.mapNotNull(popResult -> readValue(popResult.getValue())));
}
@Override
@@ -294,8 +294,8 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(sourceKey, "Source key must not be null");
Assert.notNull(destinationKey, "Destination key must not be null");
return createMono(listCommands ->
listCommands.rPopLPush(rawKey(sourceKey), rawKey(destinationKey)).map(this::readValue));
return createMono(connection -> connection.rPopLPush(rawKey(sourceKey), rawKey(destinationKey))
.map(this::readRequiredValue));
}
@Override
@@ -306,8 +306,8 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(timeout, "Duration must not be null");
Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");
return createMono(listCommands ->
listCommands.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout).map(this::readValue));
return createMono(connection -> connection.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout)
.map(this::readRequiredValue));
}
@Override
@@ -344,7 +344,19 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
return serializationContext.getValueSerializationPair().write(value);
}
@Nullable
private V readValue(ByteBuffer buffer) {
return serializationContext.getValueSerializationPair().read(buffer);
}
private V readRequiredValue(ByteBuffer buffer) {
V v = readValue(buffer);
if (v == null) {
throw new InvalidDataAccessApiUsageException("Deserialized list value is null");
}
return v;
}
}

View File

@@ -28,8 +28,10 @@ import java.util.Map;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.connection.ReactiveSetCommands;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
@@ -89,7 +91,7 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(key, "Key must not be null");
return createMono(setCommands -> setCommands.sPop(rawKey(key)).map(this::readValue));
return createMono(setCommands -> setCommands.sPop(rawKey(key)).map(this::readRequiredValue));
}
@Override
@@ -97,7 +99,7 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(key, "Key must not be null");
return createFlux(setCommands -> setCommands.sPop(rawKey(key), count).map(this::readValue));
return createFlux(setCommands -> setCommands.sPop(rawKey(key), count).map(this::readRequiredValue));
}
@Override
@@ -175,7 +177,7 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
.map(this::rawKey) //
.collectList() //
.flatMapMany(setCommands::sInter) //
.map(this::readValue));
.map(this::readRequiredValue));
}
@Override
@@ -237,7 +239,7 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
.map(this::rawKey) //
.collectList() //
.flatMapMany(setCommands::sUnion) //
.map(this::readValue));
.map(this::readRequiredValue));
}
@Override
@@ -299,7 +301,7 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
.map(this::rawKey) //
.collectList() //
.flatMapMany(setCommands::sDiff) //
.map(this::readValue));
.map(this::readRequiredValue));
}
@Override
@@ -339,7 +341,7 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(key, "Key must not be null");
return createFlux(setCommands -> setCommands.sMembers(rawKey(key)).map(this::readValue));
return createFlux(setCommands -> setCommands.sMembers(rawKey(key)).map(this::readRequiredValue));
}
@Override
@@ -348,7 +350,7 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(key, "Key must not be null");
Assert.notNull(options, "ScanOptions must not be null");
return createFlux(setCommands -> setCommands.sScan(rawKey(key), options).map(this::readValue));
return createFlux(setCommands -> setCommands.sScan(rawKey(key), options).map(this::readRequiredValue));
}
@Override
@@ -356,7 +358,7 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(key, "Key must not be null");
return createMono(setCommands -> setCommands.sRandMember(rawKey(key)).map(this::readValue));
return createMono(setCommands -> setCommands.sRandMember(rawKey(key)).map(this::readRequiredValue));
}
@Override
@@ -364,7 +366,7 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.isTrue(count > 0, "Negative count not supported; Use randomMembers to allow duplicate elements");
return createFlux(setCommands -> setCommands.sRandMember(rawKey(key), count).map(this::readValue));
return createFlux(setCommands -> setCommands.sRandMember(rawKey(key), count).map(this::readRequiredValue));
}
@Override
@@ -372,7 +374,7 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.isTrue(count > 0, "Use a positive number for count; This method is already allowing duplicate elements");
return createFlux(setCommands -> setCommands.sRandMember(rawKey(key), -count).map(this::readValue));
return createFlux(setCommands -> setCommands.sRandMember(rawKey(key), -count).map(this::readRequiredValue));
}
@Override
@@ -415,7 +417,19 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
return serializationContext.getValueSerializationPair().write(value);
}
@Nullable
private V readValue(ByteBuffer buffer) {
return serializationContext.getValueSerializationPair().read(buffer);
}
private V readRequiredValue(ByteBuffer buffer) {
V v = readValue(buffer);
if (v == null) {
throw new InvalidDataAccessApiUsageException("Deserialized set value is null");
}
return v;
}
}

View File

@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.connection.BitFieldSubCommands;
import org.springframework.data.redis.connection.ReactiveNumberCommands;
import org.springframework.data.redis.connection.ReactiveStringCommands;
@@ -34,6 +35,7 @@ import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializationContext.SerializationPair;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
@@ -147,7 +149,7 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
return createMono(stringCommands -> stringCommands.get(rawKey((K) key)) //
.map(this::readValue));
.map(this::readRequiredValue));
}
@Override
@@ -156,7 +158,7 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
return createMono(stringCommands -> stringCommands.getDel(rawKey(key)) //
.map(this::readValue));
.map(this::readRequiredValue));
}
@Override
@@ -166,7 +168,7 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(timeout, "Timeout must not be null");
return createMono(stringCommands -> stringCommands.getEx(rawKey(key), Expiration.from(timeout)) //
.map(this::readValue));
.map(this::readRequiredValue));
}
@Override
@@ -175,7 +177,7 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
return createMono(stringCommands -> stringCommands.getEx(rawKey(key), Expiration.persistent()) //
.map(this::readValue));
.map(this::readRequiredValue));
}
@Override
@@ -183,7 +185,8 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
return createMono(stringCommands -> stringCommands.getSet(rawKey(key), rawValue(value)).map(value()::read));
return createMono(stringCommands -> stringCommands.getSet(rawKey(key), rawValue(value))
.mapNotNull(value()::read));
}
@Override
@@ -252,7 +255,7 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
return createMono(stringCommands -> stringCommands.getRange(rawKey(key), start, end) //
.map(stringSerializationPair()::read));
.mapNotNull(stringSerializationPair()::read));
}
@Override
@@ -326,10 +329,22 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
return serializationContext.getValueSerializationPair().write(value);
}
@Nullable
private V readValue(ByteBuffer buffer) {
return serializationContext.getValueSerializationPair().read(buffer);
}
private V readRequiredValue(ByteBuffer buffer) {
V v = readValue(buffer);
if (v == null) {
throw new InvalidDataAccessApiUsageException("Deserialized value is null");
}
return v;
}
private SerializationPair<String> stringSerializationPair() {
return serializationContext.getStringSerializationPair();
}
@@ -357,4 +372,5 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
return result;
}
}

View File

@@ -27,7 +27,7 @@ import java.util.List;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.ReactiveZSetCommands;
@@ -38,6 +38,7 @@ import org.springframework.data.redis.connection.zset.Weights;
import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.util.ByteUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
@@ -110,7 +111,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(zSetCommands -> zSetCommands.zRandMember(rawKey(key))).map(this::readValue);
return createMono(zSetCommands -> zSetCommands.zRandMember(rawKey(key))).map(this::readRequiredValue);
}
@Override
@@ -119,7 +120,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.isTrue(count > 0, "Negative count not supported; Use randomMembers to allow duplicate elements");
return createFlux(zSetCommands -> zSetCommands.zRandMember(rawKey(key), count)).map(this::readValue);
return createFlux(zSetCommands -> zSetCommands.zRandMember(rawKey(key), count)).map(this::readRequiredValue);
}
@Override
@@ -128,7 +129,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.isTrue(count > 0, "Use a positive number for count; This method is already allowing duplicate elements");
return createFlux(zSetCommands -> zSetCommands.zRandMember(rawKey(key), -count)).map(this::readValue);
return createFlux(zSetCommands -> zSetCommands.zRandMember(rawKey(key), -count)).map(this::readRequiredValue);
}
@Override
@@ -181,7 +182,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(zSetCommands -> zSetCommands.zRange(rawKey(key), range).map(this::readValue));
return createFlux(zSetCommands -> zSetCommands.zRange(rawKey(key), range).map(this::readRequiredValue));
}
@Override
@@ -199,7 +200,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(zSetCommands -> zSetCommands.zRangeByScore(rawKey(key), range).map(this::readValue));
return createFlux(zSetCommands -> zSetCommands.zRangeByScore(rawKey(key), range).map(this::readRequiredValue));
}
@Override
@@ -218,7 +219,8 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(zSetCommands -> zSetCommands.zRangeByScore(rawKey(key), range, limit).map(this::readValue));
return createFlux(zSetCommands -> zSetCommands.zRangeByScore(rawKey(key), range, limit)
.map(this::readRequiredValue));
}
@Override
@@ -238,7 +240,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(zSetCommands -> zSetCommands.zRevRange(rawKey(key), range).map(this::readValue));
return createFlux(zSetCommands -> zSetCommands.zRevRange(rawKey(key), range).map(this::readRequiredValue));
}
@Override
@@ -257,7 +259,8 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(zSetCommands -> zSetCommands.zRevRangeByScore(rawKey(key), range).map(this::readValue));
return createFlux(zSetCommands -> zSetCommands.zRevRangeByScore(rawKey(key), range)
.map(this::readRequiredValue));
}
@Override
@@ -276,8 +279,8 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(zSetCommands ->
zSetCommands.zRevRangeByScore(rawKey(key), range, limit).map(this::readValue));
return createFlux(zSetCommands -> zSetCommands.zRevRangeByScore(rawKey(key), range, limit)
.map(this::readRequiredValue));
}
@Override
@@ -481,7 +484,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) //
.map(this::rawKey) //
.collectList() //
.flatMapMany(zSetCommands::zDiff).map(this::readValue));
.flatMapMany(zSetCommands::zDiff).map(this::readRequiredValue));
}
@Override
@@ -519,7 +522,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) //
.map(this::rawKey) //
.collectList() //
.flatMapMany(zSetCommands::zInter).map(this::readValue));
.flatMapMany(zSetCommands::zInter).map(this::readRequiredValue));
}
@Override
@@ -587,7 +590,8 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) //
.map(this::rawKey) //
.collectList() //
.flatMapMany(zSetCommands::zUnion).map(this::readValue));
.flatMapMany(zSetCommands::zUnion) //
.map(this::readRequiredValue));
}
@Override
@@ -660,7 +664,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(zSetCommands -> zSetCommands.zRangeByLex(rawKey(key), range).map(this::readValue));
return createFlux(zSetCommands -> zSetCommands.zRangeByLex(rawKey(key), range).map(this::readRequiredValue));
}
@Override
@@ -670,7 +674,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(range, "Range must not be null");
Assert.notNull(limit, "Limit must not be null");
return createFlux(zSetCommands -> zSetCommands.zRangeByLex(rawKey(key), range, limit).map(this::readValue));
return createFlux(zSetCommands -> zSetCommands.zRangeByLex(rawKey(key), range, limit).map(this::readRequiredValue));
}
@Override
@@ -679,7 +683,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(zSetCommands -> zSetCommands.zRevRangeByLex(rawKey(key), range).map(this::readValue));
return createFlux(zSetCommands -> zSetCommands.zRevRangeByLex(rawKey(key), range).map(this::readRequiredValue));
}
@Override
@@ -689,7 +693,8 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(range, "Range must not be null");
Assert.notNull(limit, "Limit must not be null");
return createFlux(zSetCommands -> zSetCommands.zRevRangeByLex(rawKey(key), range, limit).map(this::readValue));
return createFlux(zSetCommands -> zSetCommands.zRevRangeByLex(rawKey(key), range, limit)
.map(this::readRequiredValue));
}
@Override
@@ -732,10 +737,22 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
return serializationContext.getValueSerializationPair().write(value);
}
@Nullable
private V readValue(ByteBuffer buffer) {
return serializationContext.getValueSerializationPair().read(buffer);
}
private V readRequiredValue(ByteBuffer buffer) {
V v = readValue(buffer);
if (v == null) {
throw new InvalidDataAccessApiUsageException("Deserialized sorted set value is null");
}
return v;
}
private TypedTuple<V> readTypedTuple(Tuple raw) {
return new DefaultTypedTuple<>(readValue(ByteBuffer.wrap(raw.getValue())), raw.getScore());
}

View File

@@ -35,7 +35,7 @@ import org.springframework.data.redis.domain.geo.GeoReference;
import org.springframework.data.redis.domain.geo.GeoShape;
/**
* Reactive Redis operations for geo commands.
* Reactive Redis operations for Geo Commands.
*
* @author Mark Paluch
* @author Christoph Strobl
@@ -289,8 +289,7 @@ public interface ReactiveGeoOperations<K, M> {
* @since 2.6
* @see <a href="https://redis.io/commands/geosearch">Redis Documentation: GEOSEARCH</a>
*/
default Flux<GeoResult<GeoLocation<M>>> search(K key, GeoReference<M> reference,
BoundingBox boundingBox) {
default Flux<GeoResult<GeoLocation<M>>> search(K key, GeoReference<M> reference, BoundingBox boundingBox) {
return search(key, reference, boundingBox, GeoSearchCommandArgs.newGeoSearchArgs());
}
@@ -306,8 +305,8 @@ public interface ReactiveGeoOperations<K, M> {
* @since 2.6
* @see <a href="https://redis.io/commands/geosearch">Redis Documentation: GEOSEARCH</a>
*/
default Flux<GeoResult<GeoLocation<M>>> search(K key, GeoReference<M> reference,
BoundingBox boundingBox, GeoSearchCommandArgs args) {
default Flux<GeoResult<GeoLocation<M>>> search(K key, GeoReference<M> reference, BoundingBox boundingBox,
GeoSearchCommandArgs args) {
return search(key, reference, GeoShape.byBox(boundingBox), args);
}
@@ -383,8 +382,7 @@ public interface ReactiveGeoOperations<K, M> {
* @since 2.6
* @see <a href="https://redis.io/commands/geosearchstore">Redis Documentation: GEOSEARCHSTORE</a>
*/
default Mono<Long> searchAndStore(K key, K destKey, GeoReference<M> reference,
BoundingBox boundingBox) {
default Mono<Long> searchAndStore(K key, K destKey, GeoReference<M> reference, BoundingBox boundingBox) {
return searchAndStore(key, destKey, reference, boundingBox, GeoSearchStoreCommandArgs.newGeoSearchStoreArgs());
}
@@ -400,8 +398,8 @@ public interface ReactiveGeoOperations<K, M> {
* @since 2.6
* @see <a href="https://redis.io/commands/geosearchstore">Redis Documentation: GEOSEARCHSTORE</a>
*/
default Mono<Long> searchAndStore(K key, K destKey, GeoReference<M> reference,
BoundingBox boundingBox, GeoSearchStoreCommandArgs args) {
default Mono<Long> searchAndStore(K key, K destKey, GeoReference<M> reference, BoundingBox boundingBox,
GeoSearchStoreCommandArgs args) {
return searchAndStore(key, destKey, reference, GeoShape.byBox(boundingBox), args);
}

View File

@@ -18,12 +18,18 @@ package org.springframework.data.redis.core;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
/**
* Redis map specific operations working on a hash.
* Reactive Redis operations for Hash Commands.
* <p>
* Streams of methods returning {@code Mono<K>} or {@code Flux<M>} are terminated with
* {@link org.springframework.dao.InvalidDataAccessApiUsageException} when
* {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a
* particular element as Reactive Streams prohibit the usage of {@code null} values.
*
* @author Mark Paluch
* @author Christoph Strobl

View File

@@ -18,7 +18,7 @@ package org.springframework.data.redis.core;
import reactor.core.publisher.Mono;
/**
* Redis cardinality specific operations working on a HyperLogLog multiset.
* Reactive Redis operations for working on a HyperLogLog multiset.
*
* @author Mark Paluch
* @since 2.0

View File

@@ -20,6 +20,7 @@ import static org.springframework.data.redis.connection.ReactiveListCommands.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
@@ -28,7 +29,12 @@ import org.springframework.data.redis.core.ListOperations.MoveTo;
import org.springframework.util.Assert;
/**
* Redis list specific operations.
* Reactive Redis operations for List Commands.
* <p>
* Streams of methods returning {@code Mono<K>} or {@code Flux<M>} are terminated with
* {@link org.springframework.dao.InvalidDataAccessApiUsageException} when
* {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a
* particular element as Reactive Streams prohibit the usage of {@code null} values.
*
* @author Mark Paluch
* @author Christoph Strobl

View File

@@ -18,6 +18,7 @@ package org.springframework.data.redis.core;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
@@ -41,6 +42,11 @@ import org.springframework.util.Assert;
/**
* Interface that specified a basic set of Redis operations, implemented by {@link ReactiveRedisTemplate}. Not often
* used but a useful option for extensibility and testability (as it can be easily mocked or stubbed).
* <p>
* Streams of methods returning {@code Mono<K>} or {@code Flux<M>} are terminated with
* {@link org.springframework.dao.InvalidDataAccessApiUsageException} when
* {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a
* particular element as Reactive Streams prohibit the usage of {@code null} values.
*
* @author Mark Paluch
* @author Christoph Strobl

View File

@@ -27,7 +27,7 @@ import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
@@ -55,6 +55,11 @@ import org.springframework.util.ClassUtils;
* <p>
* Note that while the template is generified, it is up to the serializers/deserializers to properly convert the given
* Objects to and from binary data.
* <p>
* Streams of methods returning {@code Mono<K>} or {@code Flux<M>} are terminated with
* {@link org.springframework.dao.InvalidDataAccessApiUsageException} when
* {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a
* particular element as Reactive Streams prohibit the usage of {@code null} values.
*
* @author Mark Paluch
* @author Christoph Strobl
@@ -331,7 +336,7 @@ public class ReactiveRedisTemplate<K, V> implements ReactiveRedisOperations<K, V
return doCreateFlux(connection -> connection.keyCommands().keys(rawKey(pattern))) //
.flatMap(Flux::fromIterable) //
.map(this::readKey);
.map(this::readRequiredKey);
}
@Override
@@ -340,12 +345,12 @@ public class ReactiveRedisTemplate<K, V> implements ReactiveRedisOperations<K, V
Assert.notNull(options, "ScanOptions must not be null");
return doCreateFlux(connection -> connection.keyCommands().scan(options)) //
.map(this::readKey);
.map(this::readRequiredKey);
}
@Override
public Mono<K> randomKey() {
return doCreateMono(connection -> connection.keyCommands().randomKey()).map(this::readKey);
return doCreateMono(connection -> connection.keyCommands().randomKey()).map(this::readRequiredKey);
}
@Override
@@ -666,7 +671,19 @@ public class ReactiveRedisTemplate<K, V> implements ReactiveRedisOperations<K, V
return getSerializationContext().getKeySerializationPair().getWriter().write(key);
}
@Nullable
private K readKey(ByteBuffer buffer) {
return getSerializationContext().getKeySerializationPair().getReader().read(buffer);
}
private K readRequiredKey(ByteBuffer buffer) {
K key = readKey(buffer);
if (key == null) {
throw new InvalidDataAccessApiUsageException("Deserialized key is null");
}
return key;
}
}

View File

@@ -18,11 +18,17 @@ package org.springframework.data.redis.core;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
/**
* Redis set specific operations.
* Reactive Redis operations for Set Commands.
* <p>
* Streams of methods returning {@code Mono<K>} or {@code Flux<M>} are terminated with
* {@link org.springframework.dao.InvalidDataAccessApiUsageException} when
* {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a
* particular element as Reactive Streams prohibit the usage of {@code null} values.
*
* @author Mark Paluch
* @author Christoph Strobl

View File

@@ -23,32 +23,20 @@ import java.util.Arrays;
import java.util.Map;
import org.reactivestreams.Publisher;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.PendingMessage;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumer;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroup;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.hash.HashMapper;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Redis stream specific operations.
* Reactive Redis operations for Stream Commands.
*
* @author Mark Paluch
* @author Christoph Strobl

View File

@@ -17,6 +17,7 @@ package org.springframework.data.redis.core;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
@@ -26,6 +27,11 @@ import org.springframework.data.redis.connection.BitFieldSubCommands;
/**
* Reactive Redis operations for simple (or in Redis terminology 'string') values.
* <p>
* Streams of methods returning {@code Mono<K>} or {@code Flux<M>} are terminated with
* {@link org.springframework.dao.InvalidDataAccessApiUsageException} when
* {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a
* particular element as Reactive Streams prohibit the usage of {@code null} values.
*
* @author Mark Paluch
* @author Jiahe Cai

View File

@@ -18,6 +18,7 @@ package org.springframework.data.redis.core;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
@@ -32,7 +33,12 @@ import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
import org.springframework.lang.Nullable;
/**
* Redis ZSet/sorted set specific operations.
* Reactive Redis operations for Sorted (ZSet) Commands.
* <p>
* Streams of methods returning {@code Mono<K>} or {@code Flux<M>} are terminated with
* {@link org.springframework.dao.InvalidDataAccessApiUsageException} when
* {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a
* particular element as Reactive Streams prohibit the usage of {@code null} values.
*
* @author Mark Paluch
* @author Christoph Strobl

View File

@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Stream;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
@@ -79,7 +80,6 @@ public class DefaultReactiveScriptExecutor<K> implements ReactiveScriptExecutor<
}
@Override
@SuppressWarnings("unchecked")
public <T> Flux<T> execute(RedisScript<T> script, List<K> keys, List<?> args, RedisElementWriter<?> argsWriter,
RedisElementReader<T> resultReader) {
@@ -134,7 +134,16 @@ public class DefaultReactiveScriptExecutor<K> implements ReactiveScriptExecutor<
}
protected <T> Flux<T> deserializeResult(RedisElementReader<T> reader, Flux<T> result) {
return result.map(it -> ScriptUtils.deserializeResult(reader, it));
return result.map(it -> {
T value = ScriptUtils.deserializeResult(reader, it);
if (value == null) {
throw new InvalidDataAccessApiUsageException("Deserialized script result is null");
}
return value;
});
}
protected SerializationPair<K> keySerializer() {

View File

@@ -17,6 +17,7 @@ package org.springframework.data.redis.core.script;
import reactor.core.publisher.Flux;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
@@ -26,6 +27,11 @@ import org.springframework.data.redis.serializer.RedisSerializer;
/**
* Executes {@link RedisScript}s using reactive infrastructure.
* <p>
* Streams of methods returning {@code Mono<K>} or {@code Flux<M>} are terminated with
* {@link org.springframework.dao.InvalidDataAccessApiUsageException} when
* {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a
* particular element as Reactive Streams prohibit the usage of {@code null} values.
*
* @author Mark Paluch
* @author Christoph Strobl

View File

@@ -22,6 +22,7 @@ import java.util.List;
import org.springframework.dao.NonTransientDataAccessException;
import org.springframework.data.redis.serializer.RedisElementReader;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.lang.Nullable;
/**
* Utilities for Lua script execution and result deserialization.
@@ -71,6 +72,7 @@ class ScriptUtils {
* @param result must not be {@literal null}.
* @return the deserialized result.
*/
@Nullable
@SuppressWarnings({ "unchecked" })
static <T> T deserializeResult(RedisElementReader<T> reader, Object result) {

View File

@@ -17,6 +17,7 @@ package org.springframework.data.redis.serializer;
import java.nio.ByteBuffer;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
@@ -269,8 +270,9 @@ public interface RedisSerializationContext<K, V> {
* Deserialize a {@link ByteBuffer} into the according type.
*
* @param buffer must not be {@literal null}.
* @return the deserialized value.
* @return the deserialized value. Can be {@literal null}.
*/
@Nullable
default T read(ByteBuffer buffer) {
return getReader().read(buffer);
}