@@ -91,7 +91,8 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
|
||||
|
||||
Mono<List<GeoLocation<ByteBuffer>>> serializedList = Flux
|
||||
.fromIterable(() -> memberCoordinateMap.entrySet().iterator())
|
||||
.map(entry -> new GeoLocation<>(rawValue(entry.getKey()), entry.getValue())).collectList();
|
||||
.map(entry -> new GeoLocation<>(rawValue(entry.getKey()), entry.getValue()))
|
||||
.collectList();
|
||||
|
||||
return serializedList.flatMap(list -> geoCommands.geoAdd(rawKey(key), list));
|
||||
});
|
||||
@@ -106,7 +107,8 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
|
||||
return createMono(geoCommands -> {
|
||||
|
||||
Mono<List<GeoLocation<ByteBuffer>>> serializedList = Flux.fromIterable(geoLocations)
|
||||
.map(location -> new GeoLocation<>(rawValue(location.getName()), location.getPoint())).collectList();
|
||||
.map(location -> new GeoLocation<>(rawValue(location.getName()), location.getPoint()))
|
||||
.collectList();
|
||||
|
||||
return serializedList.flatMap(list -> geoCommands.geoAdd(rawKey(key), list));
|
||||
});
|
||||
@@ -220,7 +222,7 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
|
||||
|
||||
return createFlux(geoCommands ->
|
||||
geoCommands.geoRadiusByMember(rawKey(key), rawValue(member), new Distance(radius)) //
|
||||
.map(this::readGeoResult));
|
||||
.map(this::readGeoResult));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -265,7 +267,7 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
|
||||
|
||||
Assert.notNull(key, "Key must not be null");
|
||||
|
||||
return template.doCreateMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0);
|
||||
return template.doCreateMono(connection -> connection.keyCommands().del(rawKey(key))).map(count -> count != 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -274,10 +276,11 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
|
||||
|
||||
Assert.notNull(key, "Key must not be null");
|
||||
Assert.notNull(reference, "GeoReference must not be null");
|
||||
|
||||
GeoReference<ByteBuffer> rawReference = getGeoReference(reference);
|
||||
|
||||
return createFlux(geoCommands -> geoCommands
|
||||
.geoSearch(rawKey(key), rawReference, geoPredicate, args).map(this::readGeoResult));
|
||||
return createFlux(geoCommands -> geoCommands.geoSearch(rawKey(key), rawReference, geoPredicate, args)
|
||||
.map(this::readGeoResult));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -286,6 +289,7 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
|
||||
|
||||
Assert.notNull(key, "Key must not be null");
|
||||
Assert.notNull(reference, "GeoReference must not be null");
|
||||
|
||||
GeoReference<ByteBuffer> rawReference = getGeoReference(reference);
|
||||
|
||||
return createMono(geoCommands -> geoCommands.geoSearchStore(rawKey(destKey), rawKey(key),
|
||||
|
||||
@@ -38,6 +38,7 @@ import org.springframework.util.Assert;
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @author Christoph Strobl
|
||||
* @author John Blum
|
||||
* @since 2.0
|
||||
*/
|
||||
class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations<H, HK, HV> {
|
||||
@@ -62,7 +63,8 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
|
||||
Assert.noNullElements(hashKeys, "Hash keys must not contain null elements");
|
||||
|
||||
return createMono(hashCommands -> Flux.fromArray(hashKeys) //
|
||||
.map(o -> (HK) o).map(this::rawHashKey) //
|
||||
.map(hashKey -> (HK) hashKey)
|
||||
.map(this::rawHashKey) //
|
||||
.collectList() //
|
||||
.flatMap(hks -> hashCommands.hDel(rawKey(key), hks)));
|
||||
}
|
||||
@@ -84,8 +86,8 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
|
||||
Assert.notNull(key, "Key must not be null");
|
||||
Assert.notNull(hashKey, "Hash key must not be null");
|
||||
|
||||
return createMono(hashCommands ->
|
||||
hashCommands.hGet(rawKey(key), rawHashKey((HK) hashKey)).map(this::readHashValue));
|
||||
return createMono(hashCommands -> hashCommands.hGet(rawKey(key), rawHashKey((HK) hashKey))
|
||||
.map(this::readHashValue));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -107,8 +109,7 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
|
||||
Assert.notNull(key, "Key must not be null");
|
||||
Assert.notNull(hashKey, "Hash key must not be null");
|
||||
|
||||
return template.doCreateMono(connection -> connection //
|
||||
.numberCommands() //
|
||||
return template.doCreateMono(connection -> connection.numberCommands()
|
||||
.hIncrBy(rawKey(key), rawHashKey(hashKey), delta));
|
||||
}
|
||||
|
||||
@@ -118,8 +119,7 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
|
||||
Assert.notNull(key, "Key must not be null");
|
||||
Assert.notNull(hashKey, "Hash key must not be null");
|
||||
|
||||
return template.doCreateMono(connection -> connection //
|
||||
.numberCommands() //
|
||||
return template.doCreateMono(connection -> connection.numberCommands()
|
||||
.hIncrBy(rawKey(key), rawHashKey(hashKey), delta));
|
||||
}
|
||||
|
||||
@@ -128,8 +128,8 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
|
||||
|
||||
Assert.notNull(key, "Key must not be null");
|
||||
|
||||
return template.doCreateMono(connection -> connection //
|
||||
.hashCommands().hRandField(rawKey(key))).map(this::readRequiredHashKey);
|
||||
return template.doCreateMono(connection -> connection.hashCommands().hRandField(rawKey(key)))
|
||||
.map(this::readRequiredHashKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -137,7 +137,8 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
|
||||
|
||||
Assert.notNull(key, "Key must not be null");
|
||||
|
||||
return createMono(hashCommands ->hashCommands.hRandFieldWithValues(rawKey(key))).map(this::deserializeHashEntry);
|
||||
return createMono(hashCommands -> hashCommands.hRandFieldWithValues(rawKey(key)))
|
||||
.map(this::deserializeHashEntry);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -145,8 +146,8 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
|
||||
|
||||
Assert.notNull(key, "Key must not be null");
|
||||
|
||||
return template.doCreateFlux(connection -> connection //
|
||||
.hashCommands().hRandField(rawKey(key), count)).map(this::readRequiredHashKey);
|
||||
return template.doCreateFlux(connection -> connection.hashCommands().hRandField(rawKey(key), count))
|
||||
.map(this::readRequiredHashKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -154,8 +155,8 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
|
||||
|
||||
Assert.notNull(key, "Key must not be null");
|
||||
|
||||
return template.doCreateFlux(connection -> connection //
|
||||
.hashCommands().hRandFieldWithValues(rawKey(key), count)).map(this::deserializeHashEntry);
|
||||
return template.doCreateFlux(connection -> connection.hashCommands().hRandFieldWithValues(rawKey(key), count))
|
||||
.map(this::deserializeHashEntry);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -211,7 +212,7 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
|
||||
|
||||
Assert.notNull(key, "Key must not be null");
|
||||
|
||||
return createFlux(connection -> connection.hVals(rawKey(key)) //
|
||||
return createFlux(hashCommands -> hashCommands.hVals(rawKey(key)) //
|
||||
.map(this::readRequiredHashValue));
|
||||
}
|
||||
|
||||
@@ -278,28 +279,28 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
|
||||
|
||||
HK hashKey = readHashKey(buffer);
|
||||
|
||||
if (hashKey == null) {
|
||||
throw new InvalidDataAccessApiUsageException("Deserialized hash key is null");
|
||||
if (hashKey != null) {
|
||||
return hashKey;
|
||||
}
|
||||
|
||||
return hashKey;
|
||||
throw new InvalidDataAccessApiUsageException("Deserialized hash key is null");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Nullable
|
||||
private HV readHashValue(@Nullable ByteBuffer value) {
|
||||
return (HV) (value == null ? null : serializationContext.getHashValueSerializationPair().read(value));
|
||||
return value != null ? (HV) serializationContext.getHashValueSerializationPair().read(value) : null;
|
||||
}
|
||||
|
||||
private HV readRequiredHashValue(ByteBuffer buffer) {
|
||||
|
||||
HV hashValue = readHashValue(buffer);
|
||||
|
||||
if (hashValue == null) {
|
||||
throw new InvalidDataAccessApiUsageException("Deserialized hash value is null");
|
||||
if (hashValue != null) {
|
||||
return hashValue;
|
||||
}
|
||||
|
||||
return hashValue;
|
||||
throw new InvalidDataAccessApiUsageException("Deserialized hash value is null");
|
||||
}
|
||||
|
||||
private Map.Entry<HK, HV> deserializeHashEntry(Map.Entry<ByteBuffer, ByteBuffer> source) {
|
||||
@@ -309,9 +310,11 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
|
||||
private List<HV> deserializeHashValues(List<ByteBuffer> source) {
|
||||
|
||||
List<HV> values = new ArrayList<>(source.size());
|
||||
|
||||
for (ByteBuffer byteBuffer : source) {
|
||||
values.add(readHashValue(byteBuffer));
|
||||
}
|
||||
|
||||
return values;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -351,12 +351,12 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
|
||||
|
||||
private V readRequiredValue(ByteBuffer buffer) {
|
||||
|
||||
V v = readValue(buffer);
|
||||
V value = readValue(buffer);
|
||||
|
||||
if (v == null) {
|
||||
throw new InvalidDataAccessApiUsageException("Deserialized list value is null");
|
||||
if (value != null) {
|
||||
return value;
|
||||
}
|
||||
|
||||
return v;
|
||||
throw new InvalidDataAccessApiUsageException("Deserialized list value is null");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@ import org.springframework.util.Assert;
|
||||
* @author Mark Paluch
|
||||
* @author Christoph Strobl
|
||||
* @author Roman Bezpalko
|
||||
* @author John Blum
|
||||
* @since 2.0
|
||||
*/
|
||||
class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V> {
|
||||
@@ -424,12 +425,12 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
|
||||
|
||||
private V readRequiredValue(ByteBuffer buffer) {
|
||||
|
||||
V v = readValue(buffer);
|
||||
V value = readValue(buffer);
|
||||
|
||||
if (v == null) {
|
||||
throw new InvalidDataAccessApiUsageException("Deserialized set value is null");
|
||||
if (value != null) {
|
||||
return value;
|
||||
}
|
||||
|
||||
return v;
|
||||
throw new InvalidDataAccessApiUsageException("Deserialized set value is null");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,6 +44,7 @@ import org.springframework.util.Assert;
|
||||
* @author Mark Paluch
|
||||
* @author Christoph Strobl
|
||||
* @author Jiahe Cai
|
||||
* @author John Blum
|
||||
* @since 2.0
|
||||
*/
|
||||
class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K, V> {
|
||||
@@ -336,13 +337,13 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
|
||||
|
||||
private V readRequiredValue(ByteBuffer buffer) {
|
||||
|
||||
V v = readValue(buffer);
|
||||
V value = readValue(buffer);
|
||||
|
||||
if (v == null) {
|
||||
throw new InvalidDataAccessApiUsageException("Deserialized value is null");
|
||||
if (value != null) {
|
||||
return value;
|
||||
}
|
||||
|
||||
return v;
|
||||
throw new InvalidDataAccessApiUsageException("Deserialized value is null");
|
||||
}
|
||||
|
||||
private SerializationPair<String> stringSerializationPair() {
|
||||
@@ -372,5 +373,4 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -38,6 +38,7 @@ import org.springframework.data.redis.connection.zset.Weights;
|
||||
import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
|
||||
import org.springframework.data.redis.serializer.RedisSerializationContext;
|
||||
import org.springframework.data.redis.util.ByteUtils;
|
||||
import org.springframework.data.redis.util.RedisAssertions;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
@@ -744,13 +745,8 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
|
||||
|
||||
private V readRequiredValue(ByteBuffer buffer) {
|
||||
|
||||
V v = readValue(buffer);
|
||||
|
||||
if (v == null) {
|
||||
throw new InvalidDataAccessApiUsageException("Deserialized sorted set value is null");
|
||||
}
|
||||
|
||||
return v;
|
||||
return RedisAssertions.requireNonNull(readValue(buffer),
|
||||
() -> new InvalidDataAccessApiUsageException("Deserialized sorted set value is null"));
|
||||
}
|
||||
|
||||
private TypedTuple<V> readTypedTuple(Tuple raw) {
|
||||
|
||||
@@ -42,6 +42,7 @@ import org.springframework.util.Assert;
|
||||
* @author Wongoo (望哥)
|
||||
* @author Andrey Shlykov
|
||||
* @author Shyngys Sapraliyev
|
||||
* @author John Blum
|
||||
*/
|
||||
class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZSetOperations<K, V> {
|
||||
|
||||
@@ -54,6 +55,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
byte[] rawValue = rawValue(value);
|
||||
|
||||
return execute(connection -> connection.zAdd(rawKey, score, rawValue));
|
||||
}
|
||||
|
||||
@@ -74,6 +76,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
byte[] rawValue = rawValue(value);
|
||||
|
||||
return execute(connection -> connection.zAdd(rawKey, score, rawValue, args));
|
||||
}
|
||||
|
||||
@@ -82,6 +85,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
Set<Tuple> rawValues = rawTupleValues(tuples);
|
||||
|
||||
return execute(connection -> connection.zAdd(rawKey, rawValues));
|
||||
}
|
||||
|
||||
@@ -102,6 +106,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
Set<Tuple> rawValues = rawTupleValues(tuples);
|
||||
|
||||
return execute(connection -> connection.zAdd(rawKey, rawValues, args));
|
||||
}
|
||||
|
||||
@@ -110,6 +115,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
byte[] rawValue = rawValue(value);
|
||||
|
||||
return execute(connection -> connection.zIncrBy(rawKey, delta, rawValue));
|
||||
}
|
||||
|
||||
@@ -127,8 +133,8 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
Assert.isTrue(count > 0, "Negative count not supported; Use randomMembers to allow duplicate elements");
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
|
||||
List<byte[]> result = execute(connection -> connection.zRandMember(rawKey, count));
|
||||
|
||||
return result != null ? deserializeValues(new LinkedHashSet<>(result)) : null;
|
||||
}
|
||||
|
||||
@@ -138,8 +144,8 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
Assert.isTrue(count > 0, "Use a positive number for count; This method is already allowing duplicate elements");
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
|
||||
List<byte[]> result = execute(connection -> connection.zRandMember(rawKey, count));
|
||||
|
||||
return deserializeValues(result);
|
||||
}
|
||||
|
||||
@@ -157,8 +163,8 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
Assert.isTrue(count > 0, "Negative count not supported; Use randomMembers to allow duplicate elements");
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
|
||||
List<Tuple> result = execute(connection -> connection.zRandMemberWithScore(rawKey, count));
|
||||
|
||||
return result != null ? deserializeTupleValues(new LinkedHashSet<>(result)) : null;
|
||||
}
|
||||
|
||||
@@ -168,8 +174,8 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
Assert.isTrue(count > 0, "Use a positive number for count; This method is already allowing duplicate elements");
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
|
||||
List<Tuple> result = execute(connection -> connection.zRandMemberWithScore(rawKey, count));
|
||||
|
||||
return result != null ? deserializeTupleValues(result) : null;
|
||||
}
|
||||
|
||||
@@ -229,29 +235,37 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
|
||||
@Override
|
||||
public Long rangeAndStoreByLex(K srcKey, K dstKey, Range<String> range, Limit limit) {
|
||||
|
||||
byte[] rawDstKey = rawKey(dstKey);
|
||||
byte[] rawSrcKey = rawKey(srcKey);
|
||||
|
||||
return execute(connection -> connection.zRangeStoreByLex(rawDstKey, rawSrcKey, serialize(range), limit));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long reverseRangeAndStoreByLex(K srcKey, K dstKey, Range<String> range, Limit limit) {
|
||||
|
||||
byte[] rawDstKey = rawKey(dstKey);
|
||||
byte[] rawSrcKey = rawKey(srcKey);
|
||||
|
||||
return execute(connection -> connection.zRangeStoreRevByLex(rawDstKey, rawSrcKey, serialize(range), limit));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long rangeAndStoreByScore(K srcKey, K dstKey, Range<? extends Number> range, Limit limit) {
|
||||
|
||||
byte[] rawDstKey = rawKey(dstKey);
|
||||
byte[] rawSrcKey = rawKey(srcKey);
|
||||
|
||||
return execute(connection -> connection.zRangeStoreByScore(rawDstKey, rawSrcKey, range, limit));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long reverseRangeAndStoreByScore(K srcKey, K dstKey, Range<? extends Number> range, Limit limit) {
|
||||
|
||||
byte[] rawDstKey = rawKey(dstKey);
|
||||
byte[] rawSrcKey = rawKey(srcKey);
|
||||
|
||||
return execute(connection -> connection.zRangeStoreRevByScore(rawDstKey, rawSrcKey, range, limit));
|
||||
}
|
||||
|
||||
@@ -322,8 +336,8 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
public Set<TypedTuple<V>> reverseRangeByScoreWithScores(K key, double min, double max, long offset, long count) {
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
Set<Tuple> rawValues = execute(
|
||||
connection -> connection.zRevRangeByScoreWithScores(rawKey, min, max, offset, count));
|
||||
Set<Tuple> rawValues = execute(connection ->
|
||||
connection.zRevRangeByScoreWithScores(rawKey, min, max, offset, count));
|
||||
|
||||
return deserializeTupleValues(rawValues);
|
||||
}
|
||||
@@ -365,6 +379,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
public Long removeRange(K key, long start, long end) {
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
|
||||
return execute(connection -> connection.zRemRange(rawKey, start, end));
|
||||
}
|
||||
|
||||
@@ -372,6 +387,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
public Long removeRangeByLex(K key, Range<String> range) {
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
|
||||
return execute(connection -> connection.zRemRangeByLex(rawKey, serialize(range)));
|
||||
}
|
||||
|
||||
@@ -379,6 +395,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
public Long removeRangeByScore(K key, double min, double max) {
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
|
||||
return execute(connection -> connection.zRemRangeByScore(rawKey, min, max));
|
||||
}
|
||||
|
||||
@@ -387,6 +404,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
byte[] rawValue = rawValue(o);
|
||||
|
||||
return execute(connection -> connection.zScore(rawKey, rawValue));
|
||||
}
|
||||
|
||||
@@ -395,6 +413,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
byte[][] rawValues = rawValues(o);
|
||||
|
||||
return execute(connection -> connection.zMScore(rawKey, rawValues));
|
||||
}
|
||||
|
||||
@@ -402,6 +421,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
public Long count(K key, double min, double max) {
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
|
||||
return execute(connection -> connection.zCount(rawKey, min, max));
|
||||
}
|
||||
|
||||
@@ -409,6 +429,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
public Long lexCount(K key, Range<String> range) {
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
|
||||
return execute(connection -> connection.zLexCount(rawKey, serialize(range)));
|
||||
}
|
||||
|
||||
@@ -417,6 +438,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
public TypedTuple<V> popMin(K key) {
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
|
||||
return deserializeTuple(execute(connection -> connection.zPopMin(rawKey)));
|
||||
}
|
||||
|
||||
@@ -426,6 +448,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
Set<Tuple> result = execute(connection -> connection.zPopMin(rawKey, count));
|
||||
|
||||
return deserializeTupleValues(new LinkedHashSet<>(result));
|
||||
}
|
||||
|
||||
@@ -434,6 +457,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
public TypedTuple<V> popMin(K key, long timeout, TimeUnit unit) {
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
|
||||
return deserializeTuple(execute(connection -> connection.bZPopMin(rawKey, timeout, unit)));
|
||||
}
|
||||
|
||||
@@ -442,6 +466,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
public TypedTuple<V> popMax(K key) {
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
|
||||
return deserializeTuple(execute(connection -> connection.zPopMax(rawKey)));
|
||||
}
|
||||
|
||||
@@ -451,6 +476,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
Set<Tuple> result = execute(connection -> connection.zPopMax(rawKey, count));
|
||||
|
||||
return deserializeTupleValues(new LinkedHashSet<>(result));
|
||||
}
|
||||
|
||||
@@ -459,6 +485,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
public TypedTuple<V> popMax(K key, long timeout, TimeUnit unit) {
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
|
||||
return deserializeTuple(execute(connection -> connection.bZPopMax(rawKey, timeout, unit)));
|
||||
}
|
||||
|
||||
@@ -471,6 +498,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
public Long zCard(K key) {
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
|
||||
return execute(connection -> connection.zCard(rawKey));
|
||||
}
|
||||
|
||||
@@ -479,6 +507,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
|
||||
byte[][] rawKeys = rawKeys(key, otherKeys);
|
||||
Set<byte[]> rawValues = execute(connection -> connection.zDiff(rawKeys));
|
||||
|
||||
return deserializeValues(rawValues);
|
||||
}
|
||||
|
||||
@@ -487,6 +516,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
|
||||
byte[][] rawKeys = rawKeys(key, otherKeys);
|
||||
Set<Tuple> result = execute(connection -> connection.zDiffWithScores(rawKeys));
|
||||
|
||||
return deserializeTupleValues(new LinkedHashSet<>(result));
|
||||
}
|
||||
|
||||
@@ -504,6 +534,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
|
||||
byte[][] rawKeys = rawKeys(key, otherKeys);
|
||||
Set<byte[]> rawValues = execute(connection -> connection.zInter(rawKeys));
|
||||
|
||||
return deserializeValues(rawValues);
|
||||
}
|
||||
|
||||
@@ -512,6 +543,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
|
||||
byte[][] rawKeys = rawKeys(key, otherKeys);
|
||||
Set<Tuple> result = execute(connection -> connection.zInterWithScores(rawKeys));
|
||||
|
||||
return deserializeTupleValues(result);
|
||||
}
|
||||
|
||||
@@ -520,6 +552,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
|
||||
byte[][] rawKeys = rawKeys(key, otherKeys);
|
||||
Set<Tuple> result = execute(connection -> connection.zInterWithScores(aggregate, weights, rawKeys));
|
||||
|
||||
return deserializeTupleValues(result);
|
||||
}
|
||||
|
||||
@@ -551,6 +584,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
|
||||
byte[][] rawKeys = rawKeys(key, otherKeys);
|
||||
Set<byte[]> rawValues = execute(connection -> connection.zUnion(rawKeys));
|
||||
|
||||
return deserializeValues(rawValues);
|
||||
}
|
||||
|
||||
@@ -559,6 +593,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
|
||||
byte[][] rawKeys = rawKeys(key, otherKeys);
|
||||
Set<Tuple> result = execute(connection -> connection.zUnionWithScores(rawKeys));
|
||||
|
||||
return deserializeTupleValues(result);
|
||||
}
|
||||
|
||||
@@ -567,6 +602,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
|
||||
byte[][] rawKeys = rawKeys(key, otherKeys);
|
||||
Set<Tuple> result = execute(connection -> connection.zUnionWithScores(aggregate, weights, rawKeys));
|
||||
|
||||
return deserializeTupleValues(result);
|
||||
}
|
||||
|
||||
@@ -605,6 +641,7 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
public Set<byte[]> rangeByScore(K key, String min, String max) {
|
||||
|
||||
byte[] rawKey = rawKey(key);
|
||||
|
||||
return execute(connection -> connection.zRangeByScore(rawKey, min, max));
|
||||
}
|
||||
|
||||
@@ -632,5 +669,4 @@ class DefaultZSetOperations<K, V> extends AbstractOperations<K, V> implements ZS
|
||||
.map(it -> source.isInclusive() ? Range.Bound.inclusive(it) : Range.Bound.exclusive(it))
|
||||
.orElseGet(Range.Bound::unbounded);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -64,9 +64,10 @@ import org.springframework.util.ClassUtils;
|
||||
* @author Mark Paluch
|
||||
* @author Christoph Strobl
|
||||
* @author Petromir Dzhunev
|
||||
* @since 2.0
|
||||
* @author John Blum
|
||||
* @param <K> the Redis key type against which the template works (usually a String)
|
||||
* @param <V> the Redis value type against which the template works
|
||||
* @since 2.0
|
||||
*/
|
||||
public class ReactiveRedisTemplate<K, V> implements ReactiveRedisOperations<K, V> {
|
||||
|
||||
@@ -680,10 +681,10 @@ public class ReactiveRedisTemplate<K, V> implements ReactiveRedisOperations<K, V
|
||||
|
||||
K key = readKey(buffer);
|
||||
|
||||
if (key == null) {
|
||||
throw new InvalidDataAccessApiUsageException("Deserialized key is null");
|
||||
if (key != null) {
|
||||
return key;
|
||||
}
|
||||
|
||||
return key;
|
||||
throw new InvalidDataAccessApiUsageException("Deserialized key is null");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,11 +26,22 @@ import org.reactivestreams.Publisher;
|
||||
import org.springframework.data.domain.Range;
|
||||
import org.springframework.data.redis.connection.Limit;
|
||||
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
|
||||
import org.springframework.data.redis.connection.stream.*;
|
||||
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
|
||||
import org.springframework.data.redis.connection.stream.Consumer;
|
||||
import org.springframework.data.redis.connection.stream.MapRecord;
|
||||
import org.springframework.data.redis.connection.stream.ObjectRecord;
|
||||
import org.springframework.data.redis.connection.stream.PendingMessage;
|
||||
import org.springframework.data.redis.connection.stream.PendingMessages;
|
||||
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
|
||||
import org.springframework.data.redis.connection.stream.ReadOffset;
|
||||
import org.springframework.data.redis.connection.stream.Record;
|
||||
import org.springframework.data.redis.connection.stream.RecordId;
|
||||
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumer;
|
||||
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroup;
|
||||
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
|
||||
import org.springframework.data.redis.connection.stream.StreamOffset;
|
||||
import org.springframework.data.redis.connection.stream.StreamReadOptions;
|
||||
import org.springframework.data.redis.connection.stream.StreamRecords;
|
||||
import org.springframework.data.redis.hash.HashMapper;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
@@ -40,6 +40,7 @@ import org.springframework.util.Assert;
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @author Christoph Strobl
|
||||
* @author John Blum
|
||||
* @param <K> The type of keys that may be passed during script execution
|
||||
* @since 2.0
|
||||
*/
|
||||
@@ -105,44 +106,41 @@ public class DefaultReactiveScriptExecutor<K> implements ReactiveScriptExecutor<
|
||||
|
||||
Flux<T> result = connection.scriptingCommands().evalSha(script.getSha1(), returnType, numKeys, keysAndArgs);
|
||||
|
||||
result = result.onErrorResume(e -> {
|
||||
result = result.onErrorResume(cause -> {
|
||||
|
||||
if (ScriptUtils.exceptionContainsNoScriptError(e)) {
|
||||
if (ScriptUtils.exceptionContainsNoScriptError(cause)) {
|
||||
return connection.scriptingCommands().eval(scriptBytes(script), returnType, numKeys, keysAndArgs);
|
||||
}
|
||||
|
||||
return Flux
|
||||
.error(e instanceof RuntimeException ? (RuntimeException) e : new RedisSystemException(e.getMessage(), e));
|
||||
return Flux.error(cause instanceof RuntimeException ? cause
|
||||
: new RedisSystemException(cause.getMessage(), cause));
|
||||
});
|
||||
|
||||
return script.returnsRawValue() ? result : deserializeResult(resultReader, result);
|
||||
}
|
||||
|
||||
@SuppressWarnings("Convert2MethodRef")
|
||||
@SuppressWarnings({ "Convert2MethodRef", "rawtypes", "unchecked" })
|
||||
protected ByteBuffer[] keysAndArgs(RedisElementWriter argsWriter, List<K> keys, List<?> args) {
|
||||
|
||||
return Stream.concat(keys.stream().map(t -> keySerializer().getWriter().write(t)),
|
||||
args.stream().map(t -> argsWriter.write(t))).toArray(size -> new ByteBuffer[size]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param script
|
||||
* @return
|
||||
*/
|
||||
protected ByteBuffer scriptBytes(RedisScript<?> script) {
|
||||
return serializationContext.getStringSerializationPair().getWriter().write(script.getScriptAsString());
|
||||
}
|
||||
|
||||
protected <T> Flux<T> deserializeResult(RedisElementReader<T> reader, Flux<T> result) {
|
||||
|
||||
return result.map(it -> {
|
||||
|
||||
T value = ScriptUtils.deserializeResult(reader, it);
|
||||
|
||||
if (value == null) {
|
||||
throw new InvalidDataAccessApiUsageException("Deserialized script result is null");
|
||||
if (value != null) {
|
||||
return value;
|
||||
}
|
||||
|
||||
return value;
|
||||
throw new InvalidDataAccessApiUsageException("Deserialized script result is null");
|
||||
});
|
||||
}
|
||||
|
||||
@@ -169,6 +167,6 @@ public class DefaultReactiveScriptExecutor<K> implements ReactiveScriptExecutor<
|
||||
}
|
||||
|
||||
public ReactiveRedisConnectionFactory getConnectionFactory() {
|
||||
return connectionFactory;
|
||||
return this.connectionFactory;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,6 +57,25 @@ public abstract class RedisAssertions {
|
||||
return target;
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts the given {@link Object} is not {@literal null} throwing the given {@link RuntimeException}
|
||||
* if {@link Object} is {@literal null}.
|
||||
*
|
||||
* @param <T> {@link Class type} of {@link Object} being asserted.
|
||||
* @param target {@link Object} to evaluate.
|
||||
* @param cause {@link Supplier} of a {@link RuntimeException} to throw
|
||||
* if the given {@link Object} is {@literal null}.
|
||||
* @return the given {@link Object}.
|
||||
*/
|
||||
public static <T> T requireNonNull(@Nullable T target, RuntimeExceptionSupplier cause) {
|
||||
|
||||
if (target == null) {
|
||||
throw cause.get();
|
||||
}
|
||||
|
||||
return target;
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts the given {@link Object} is not {@literal null}.
|
||||
*
|
||||
@@ -85,4 +104,7 @@ public abstract class RedisAssertions {
|
||||
Assert.state(target != null, message);
|
||||
return target;
|
||||
}
|
||||
|
||||
public interface RuntimeExceptionSupplier extends Supplier<RuntimeException> { }
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user