Rename Function Lambda parameter to match command interface in (Default) ReactiveXxxOperations.

Cleanup compiler warning.

Fix unnatural line breaks.

Closes #2658
Original pull request: #2659
This commit is contained in:
John Blum
2023-08-03 16:07:23 -07:00
committed by Mark Paluch
parent ceefb6e24b
commit 00737441fd
8 changed files with 278 additions and 255 deletions

View File

@@ -68,7 +68,7 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
Assert.notNull(point, "Point must not be null");
Assert.notNull(member, "Member must not be null");
return createMono(connection -> connection.geoAdd(rawKey(key), point, rawValue(member)));
return createMono(geoCommands -> geoCommands.geoAdd(rawKey(key), point, rawValue(member)));
}
@Override
@@ -77,7 +77,7 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
Assert.notNull(key, "Key must not be null");
Assert.notNull(location, "GeoLocation must not be null");
return createMono(connection -> connection.geoAdd(rawKey(key),
return createMono(geoCommands -> geoCommands.geoAdd(rawKey(key),
new GeoLocation<>(rawValue(location.getName()), location.getPoint())));
}
@@ -87,13 +87,13 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
Assert.notNull(key, "Key must not be null");
Assert.notNull(memberCoordinateMap, "MemberCoordinateMap must not be null");
return createMono(connection -> {
return createMono(geoCommands -> {
Mono<List<GeoLocation<ByteBuffer>>> serializedList = Flux
.fromIterable(() -> memberCoordinateMap.entrySet().iterator())
.map(entry -> new GeoLocation<>(rawValue(entry.getKey()), entry.getValue())).collectList();
return serializedList.flatMap(list -> connection.geoAdd(rawKey(key), list));
return serializedList.flatMap(list -> geoCommands.geoAdd(rawKey(key), list));
});
}
@@ -103,12 +103,12 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
Assert.notNull(key, "Key must not be null");
Assert.notNull(geoLocations, "GeoLocations must not be null");
return createMono(connection -> {
return createMono(geoCommands -> {
Mono<List<GeoLocation<ByteBuffer>>> serializedList = Flux.fromIterable(geoLocations)
.map(location -> new GeoLocation<>(rawValue(location.getName()), location.getPoint())).collectList();
return serializedList.flatMap(list -> connection.geoAdd(rawKey(key), list));
return serializedList.flatMap(list -> geoCommands.geoAdd(rawKey(key), list));
});
}
@@ -118,11 +118,11 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
Assert.notNull(key, "Key must not be null");
Assert.notNull(locations, "Locations must not be null");
return createFlux(connection -> Flux.from(locations)
return createFlux(geoCommands -> Flux.from(locations)
.map(locationList -> locationList.stream()
.map(location -> new GeoLocation<>(rawValue(location.getName()), location.getPoint()))
.collect(Collectors.toList()))
.flatMap(list -> connection.geoAdd(rawKey(key), list)));
.flatMap(list -> geoCommands.geoAdd(rawKey(key), list)));
}
@Override
@@ -132,7 +132,7 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
Assert.notNull(member1, "Member 1 must not be null");
Assert.notNull(member2, "Member 2 must not be null");
return createMono(connection -> connection.geoDist(rawKey(key), rawValue(member1), rawValue(member2)));
return createMono(geoCommands -> geoCommands.geoDist(rawKey(key), rawValue(member1), rawValue(member2)));
}
@Override
@@ -143,7 +143,7 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
Assert.notNull(member2, "Member 2 must not be null");
Assert.notNull(metric, "Metric must not be null");
return createMono(connection -> connection.geoDist(rawKey(key), rawValue(member1), rawValue(member2), metric));
return createMono(geoCommands -> geoCommands.geoDist(rawKey(key), rawValue(member1), rawValue(member2), metric));
}
@Override
@@ -152,7 +152,7 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
Assert.notNull(key, "Key must not be null");
Assert.notNull(member, "Member must not be null");
return createMono(connection -> connection.geoHash(rawKey(key), rawValue(member)));
return createMono(geoCommands -> geoCommands.geoHash(rawKey(key), rawValue(member)));
}
@Override
@@ -163,10 +163,10 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
Assert.notEmpty(members, "Members must not be null or empty");
Assert.noNullElements(members, "Members must not contain null elements");
return createMono(connection -> Flux.fromArray(members) //
return createMono(geoCommands -> Flux.fromArray(members) //
.map(this::rawValue) //
.collectList() //
.flatMap(serialized -> connection.geoHash(rawKey(key), serialized)));
.flatMap(serialized -> geoCommands.geoHash(rawKey(key), serialized)));
}
@Override
@@ -175,7 +175,7 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
Assert.notNull(key, "Key must not be null");
Assert.notNull(member, "Member must not be null");
return createMono(connection -> connection.geoPos(rawKey(key), rawValue(member)));
return createMono(geoCommands -> geoCommands.geoPos(rawKey(key), rawValue(member)));
}
@Override
@@ -186,10 +186,10 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
Assert.notEmpty(members, "Members must not be null or empty");
Assert.noNullElements(members, "Members must not contain null elements");
return createMono(connection -> Flux.fromArray(members) //
return createMono(geoCommands -> Flux.fromArray(members) //
.map(this::rawValue) //
.collectList() //
.flatMap(serialized -> connection.geoPos(rawKey(key), serialized)));
.flatMap(serialized -> geoCommands.geoPos(rawKey(key), serialized)));
}
@Override
@@ -198,7 +198,7 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
Assert.notNull(key, "Key must not be null");
Assert.notNull(within, "Circle must not be null");
return createFlux(connection -> connection.geoRadius(rawKey(key), within).map(this::readGeoResult));
return createFlux(geoCommands -> geoCommands.geoRadius(rawKey(key), within).map(this::readGeoResult));
}
@Override
@@ -208,7 +208,7 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
Assert.notNull(within, "Circle must not be null");
Assert.notNull(args, "GeoRadiusCommandArgs must not be null");
return createFlux(connection -> connection.geoRadius(rawKey(key), within, args) //
return createFlux(geoCommands -> geoCommands.geoRadius(rawKey(key), within, args) //
.map(this::readGeoResult));
}
@@ -218,8 +218,9 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
Assert.notNull(key, "Key must not be null");
Assert.notNull(member, "Member must not be null");
return createFlux(connection -> connection.geoRadiusByMember(rawKey(key), rawValue(member), new Distance(radius)) //
.map(this::readGeoResult));
return createFlux(geoCommands ->
geoCommands.geoRadiusByMember(rawKey(key), rawValue(member), new Distance(radius)) //
.map(this::readGeoResult));
}
@Override
@@ -229,7 +230,7 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
Assert.notNull(member, "Member must not be null");
Assert.notNull(distance, "Distance must not be null");
return createFlux(connection -> connection.geoRadiusByMember(rawKey(key), rawValue(member), distance) //
return createFlux(geoCommands -> geoCommands.geoRadiusByMember(rawKey(key), rawValue(member), distance) //
.map(this::readGeoResult));
}
@@ -241,7 +242,7 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
Assert.notNull(distance, "Distance must not be null");
Assert.notNull(args, "GeoRadiusCommandArgs must not be null");
return createFlux(connection -> connection.geoRadiusByMember(rawKey(key), rawValue(member), distance, args))
return createFlux(geoCommands -> geoCommands.geoRadiusByMember(rawKey(key), rawValue(member), distance, args))
.map(this::readGeoResult);
}
@@ -308,8 +309,7 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
@SuppressWarnings("unchecked")
private GeoReference<ByteBuffer> getGeoReference(GeoReference<V> reference) {
return reference instanceof GeoReference.GeoMemberReference
? GeoReference
.fromMember(rawValue(((GeoMemberReference<V>) reference).getMember()))
? GeoReference.fromMember(rawValue(((GeoMemberReference<V>) reference).getMember()))
: (GeoReference<ByteBuffer>) reference;
}
@@ -327,7 +327,7 @@ class DefaultReactiveGeoOperations<K, V> implements ReactiveGeoOperations<K, V>
private GeoResult<GeoLocation<V>> readGeoResult(GeoResult<GeoLocation<ByteBuffer>> source) {
return new GeoResult<>(new GeoLocation(readValue(source.getContent().getName()), source.getContent().getPoint()),
return new GeoResult<>(new GeoLocation<>(readValue(source.getContent().getName()), source.getContent().getPoint()),
source.getDistance());
}
}

View File

@@ -60,10 +60,10 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
Assert.notEmpty(hashKeys, "Hash keys must not be empty");
Assert.noNullElements(hashKeys, "Hash keys must not contain null elements");
return createMono(connection -> Flux.fromArray(hashKeys) //
return createMono(hashCommands -> Flux.fromArray(hashKeys) //
.map(o -> (HK) o).map(this::rawHashKey) //
.collectList() //
.flatMap(hks -> connection.hDel(rawKey(key), hks)));
.flatMap(hks -> hashCommands.hDel(rawKey(key), hks)));
}
@Override
@@ -73,7 +73,7 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
Assert.notNull(key, "Key must not be null");
Assert.notNull(hashKey, "Hash key must not be null");
return createMono(connection -> connection.hExists(rawKey(key), rawHashKey((HK) hashKey)));
return createMono(hashCommands -> hashCommands.hExists(rawKey(key), rawHashKey((HK) hashKey)));
}
@Override
@@ -83,7 +83,8 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
Assert.notNull(key, "Key must not be null");
Assert.notNull(hashKey, "Hash key must not be null");
return createMono(connection -> connection.hGet(rawKey(key), rawHashKey((HK) hashKey)).map(this::readHashValue));
return createMono(hashCommands ->
hashCommands.hGet(rawKey(key), rawHashKey((HK) hashKey)).map(this::readHashValue));
}
@Override
@@ -93,10 +94,10 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
Assert.notNull(hashKeys, "Hash keys must not be null");
Assert.notEmpty(hashKeys, "Hash keys must not be empty");
return createMono(connection -> Flux.fromIterable(hashKeys) //
return createMono(hashCommands -> Flux.fromIterable(hashKeys) //
.map(this::rawHashKey) //
.collectList() //
.flatMap(hks -> connection.hMGet(rawKey(key), hks)).map(this::deserializeHashValues));
.flatMap(hks -> hashCommands.hMGet(rawKey(key), hks)).map(this::deserializeHashValues));
}
@Override
@@ -162,7 +163,7 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
Assert.notNull(key, "Key must not be null");
return createFlux(connection -> connection.hKeys(rawKey(key)) //
return createFlux(hashCommands -> hashCommands.hKeys(rawKey(key)) //
.map(this::readHashKey));
}
@@ -171,7 +172,7 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.hLen(rawKey(key)));
return createMono(hashCommands -> hashCommands.hLen(rawKey(key)));
}
@Override
@@ -180,9 +181,9 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
Assert.notNull(key, "Key must not be null");
Assert.notNull(map, "Map must not be null");
return createMono(connection -> Flux.fromIterable(() -> map.entrySet().iterator()) //
return createMono(hashCommands -> Flux.fromIterable(() -> map.entrySet().iterator()) //
.collectMap(entry -> rawHashKey(entry.getKey()), entry -> rawHashValue(entry.getValue())) //
.flatMap(serialized -> connection.hMSet(rawKey(key), serialized)));
.flatMap(serialized -> hashCommands.hMSet(rawKey(key), serialized)));
}
@Override
@@ -192,7 +193,7 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
Assert.notNull(hashKey, "Hash key must not be null");
Assert.notNull(value, "Hash value must not be null");
return createMono(connection -> connection.hSet(rawKey(key), rawHashKey(hashKey), rawHashValue(value)));
return createMono(hashCommands -> hashCommands.hSet(rawKey(key), rawHashKey(hashKey), rawHashValue(value)));
}
@Override
@@ -202,7 +203,7 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
Assert.notNull(hashKey, "Hash key must not be null");
Assert.notNull(value, "Hash value must not be null");
return createMono(connection -> connection.hSetNX(rawKey(key), rawHashKey(hashKey), rawHashValue(value)));
return createMono(hashCommands -> hashCommands.hSetNX(rawKey(key), rawHashKey(hashKey), rawHashValue(value)));
}
@Override
@@ -210,7 +211,7 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
Assert.notNull(key, "Key must not be null");
return createFlux(connection -> connection.hVals(rawKey(key)) //
return createFlux(hashCommands -> hashCommands.hVals(rawKey(key)) //
.map(this::readHashValue));
}
@@ -219,7 +220,7 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
Assert.notNull(key, "Key must not be null");
return createFlux(connection -> connection.hGetAll(rawKey(key)) //
return createFlux(hashCommands -> hashCommands.hGetAll(rawKey(key)) //
.map(this::deserializeHashEntry));
}
@@ -229,7 +230,7 @@ class DefaultReactiveHashOperations<H, HK, HV> implements ReactiveHashOperations
Assert.notNull(key, "Key must not be null");
Assert.notNull(key, "ScanOptions must not be null");
return createFlux(connection -> connection.hScan(rawKey(key), options) //
return createFlux(hashCommands -> hashCommands.hScan(rawKey(key), options) //
.map(this::deserializeHashEntry));
}

View File

@@ -53,10 +53,10 @@ class DefaultReactiveHyperLogLogOperations<K, V> implements ReactiveHyperLogLogO
Assert.notEmpty(values, "Values must not be null or empty");
Assert.noNullElements(values, "Values must not contain null elements");
return createMono(connection -> Flux.fromArray(values) //
return createMono(hyperLogLogCommands -> Flux.fromArray(values) //
.map(this::rawValue) //
.collectList() //
.flatMap(serializedValues -> connection.pfAdd(rawKey(key), serializedValues)));
.flatMap(serializedValues -> hyperLogLogCommands.pfAdd(rawKey(key), serializedValues)));
}
@Override
@@ -66,10 +66,10 @@ class DefaultReactiveHyperLogLogOperations<K, V> implements ReactiveHyperLogLogO
Assert.notEmpty(keys, "Keys must not be null or empty");
Assert.noNullElements(keys, "Keys must not contain null elements");
return createMono(connection -> Flux.fromArray(keys) //
return createMono(hyperLogLogCommands -> Flux.fromArray(keys) //
.map(this::rawKey) //
.collectList() //
.flatMap(connection::pfCount));
.flatMap(hyperLogLogCommands::pfCount));
}
@Override
@@ -80,10 +80,10 @@ class DefaultReactiveHyperLogLogOperations<K, V> implements ReactiveHyperLogLogO
Assert.notEmpty(sourceKeys, "Source keys must not be null or empty");
Assert.noNullElements(sourceKeys, "Source keys must not contain null elements");
return createMono(connection -> Flux.fromArray(sourceKeys) //
return createMono(hyperLogLogCommands -> Flux.fromArray(sourceKeys) //
.map(this::rawKey) //
.collectList() //
.flatMap(serialized -> connection.pfMerge(rawKey(destination), serialized)));
.flatMap(serialized -> hyperLogLogCommands.pfMerge(rawKey(destination), serialized)));
}
@Override

View File

@@ -58,7 +58,7 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createFlux(connection -> connection.lRange(rawKey(key), start, end).map(this::readValue));
return createFlux(listCommands -> listCommands.lRange(rawKey(key), start, end).map(this::readValue));
}
@Override
@@ -66,7 +66,7 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.lTrim(rawKey(key), start, end));
return createMono(listCommands -> listCommands.lTrim(rawKey(key), start, end));
}
@Override
@@ -74,7 +74,7 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.lLen(rawKey(key)));
return createMono(listCommands -> listCommands.lLen(rawKey(key)));
}
@Override
@@ -97,10 +97,10 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notEmpty(values, "Values must not be null or empty");
return createMono(connection -> Flux.fromIterable(values) //
return createMono(listCommands -> Flux.fromIterable(values) //
.map(this::rawValue) //
.collectList() //
.flatMap(serialized -> connection.lPush(rawKey(key), serialized)));
.flatMap(serialized -> listCommands.lPush(rawKey(key), serialized)));
}
@Override
@@ -108,7 +108,7 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.lPushX(rawKey(key), rawValue(value)));
return createMono(listCommands -> listCommands.lPushX(rawKey(key), rawValue(value)));
}
@Override
@@ -116,7 +116,8 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.lInsert(rawKey(key), Position.BEFORE, rawValue(pivot), rawValue(value)));
return createMono(listCommands ->
listCommands.lInsert(rawKey(key), Position.BEFORE, rawValue(pivot), rawValue(value)));
}
@Override
@@ -139,10 +140,10 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notEmpty(values, "Values must not be null or empty");
return createMono(connection -> Flux.fromIterable(values) //
return createMono(listCommands -> Flux.fromIterable(values) //
.map(this::rawValue) //
.collectList() //
.flatMap(serialized -> connection.rPush(rawKey(key), serialized)));
.flatMap(serialized -> listCommands.rPush(rawKey(key), serialized)));
}
@Override
@@ -150,7 +151,7 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.rPushX(rawKey(key), rawValue(value)));
return createMono(listCommands -> listCommands.rPushX(rawKey(key), rawValue(value)));
}
@Override
@@ -158,7 +159,8 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.lInsert(rawKey(key), Position.AFTER, rawValue(pivot), rawValue(value)));
return createMono(listCommands ->
listCommands.lInsert(rawKey(key), Position.AFTER, rawValue(pivot), rawValue(value)));
}
@Override
@@ -169,8 +171,8 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(from, "From direction must not be null");
Assert.notNull(to, "To direction must not be null");
return createMono(
connection -> connection.lMove(rawKey(sourceKey), rawKey(destinationKey), from, to).map(this::readValue));
return createMono(listCommands ->
listCommands.lMove(rawKey(sourceKey), rawKey(destinationKey), from, to).map(this::readValue));
}
@Override
@@ -182,8 +184,8 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(to, "To direction must not be null");
Assert.notNull(timeout, "Timeout must not be null");
return createMono(connection -> connection.bLMove(rawKey(sourceKey), rawKey(destinationKey), from, to, timeout)
.map(this::readValue));
return createMono(listCommands ->
listCommands.bLMove(rawKey(sourceKey), rawKey(destinationKey), from, to, timeout).map(this::readValue));
}
@Override
@@ -191,7 +193,7 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.lSet(rawKey(key), index, rawValue(value)));
return createMono(listCommands -> listCommands.lSet(rawKey(key), index, rawValue(value)));
}
@Override
@@ -200,7 +202,7 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.lRem(rawKey(key), count, rawValue((V) value)));
return createMono(listCommands -> listCommands.lRem(rawKey(key), count, rawValue((V) value)));
}
@Override
@@ -208,7 +210,7 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.lIndex(rawKey(key), index).map(this::readValue));
return createMono(listCommands -> listCommands.lIndex(rawKey(key), index).map(this::readValue));
}
@Override
@@ -216,7 +218,7 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.lPos(rawKey(key), rawValue(value)));
return createMono(listCommands -> listCommands.lPos(rawKey(key), rawValue(value)));
}
@Override
@@ -224,7 +226,8 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.lPos(LPosCommand.lPosOf(rawValue(value)).from(rawKey(key)).rank(-1)));
return createMono(listCommands ->
listCommands.lPos(LPosCommand.lPosOf(rawValue(value)).from(rawKey(key)).rank(-1)));
}
@Override
@@ -232,7 +235,7 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.lPop(rawKey(key)).map(this::readValue));
return createMono(listCommands -> listCommands.lPop(rawKey(key)).map(this::readValue));
}
@@ -243,8 +246,9 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(timeout, "Duration must not be null");
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
return createMono(connection -> connection.blPop(Collections.singletonList(rawKey(key)), timeout)
.map(popResult -> readValue(popResult.getValue())));
return createMono(listCommands ->
listCommands.blPop(Collections.singletonList(rawKey(key)), timeout)
.map(popResult -> readValue(popResult.getValue())));
}
@Override
@@ -252,7 +256,7 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.rPop(rawKey(key)).map(this::readValue));
return createMono(listCommands -> listCommands.rPop(rawKey(key)).map(this::readValue));
}
@Override
@@ -262,8 +266,9 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(timeout, "Duration must not be null");
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
return createMono(connection -> connection.brPop(Collections.singletonList(rawKey(key)), timeout)
.map(popResult -> readValue(popResult.getValue())));
return createMono(listCommands ->
listCommands.brPop(Collections.singletonList(rawKey(key)), timeout)
.map(popResult -> readValue(popResult.getValue())));
}
@Override
@@ -272,8 +277,8 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(sourceKey, "Source key must not be null");
Assert.notNull(destinationKey, "Destination key must not be null");
return createMono(
connection -> connection.rPopLPush(rawKey(sourceKey), rawKey(destinationKey)).map(this::readValue));
return createMono(listCommands ->
listCommands.rPopLPush(rawKey(sourceKey), rawKey(destinationKey)).map(this::readValue));
}
@Override
@@ -284,8 +289,8 @@ class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V
Assert.notNull(timeout, "Duration must not be null");
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
return createMono(
connection -> connection.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout).map(this::readValue));
return createMono(listCommands ->
listCommands.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout).map(this::readValue));
}
@Override

View File

@@ -53,18 +53,19 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
}
@Override
@SuppressWarnings("unchecked")
public Mono<Long> add(K key, V... values) {
Assert.notNull(key, "Key must not be null");
if (values.length == 1) {
return createMono(connection -> connection.sAdd(rawKey(key), rawValue(values[0])));
return createMono(setCommands -> setCommands.sAdd(rawKey(key), rawValue(values[0])));
}
return createMono(connection -> Flux.fromArray(values) //
return createMono(setCommands -> Flux.fromArray(values) //
.map(this::rawValue) //
.collectList() //
.flatMap(serialized -> connection.sAdd(rawKey(key), serialized)));
.flatMap(serialized -> setCommands.sAdd(rawKey(key), serialized)));
}
@Override
@@ -74,13 +75,13 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(key, "Key must not be null");
if (values.length == 1) {
return createMono(connection -> connection.sRem(rawKey(key), rawValue((V) values[0])));
return createMono(setCommands -> setCommands.sRem(rawKey(key), rawValue((V) values[0])));
}
return createMono(connection -> Flux.fromArray((V[]) values) //
return createMono(setCommands -> Flux.fromArray((V[]) values) //
.map(this::rawValue) //
.collectList() //
.flatMap(serialized -> connection.sRem(rawKey(key), serialized)));
.flatMap(serialized -> setCommands.sRem(rawKey(key), serialized)));
}
@Override
@@ -88,7 +89,7 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.sPop(rawKey(key)).map(this::readValue));
return createMono(setCommands -> setCommands.sPop(rawKey(key)).map(this::readValue));
}
@Override
@@ -96,7 +97,7 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(key, "Key must not be null");
return createFlux(connection -> connection.sPop(rawKey(key), count).map(this::readValue));
return createFlux(setCommands -> setCommands.sPop(rawKey(key), count).map(this::readValue));
}
@Override
@@ -105,7 +106,7 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(sourceKey, "Source key must not be null");
Assert.notNull(destKey, "Destination key must not be null");
return createMono(connection -> connection.sMove(rawKey(sourceKey), rawKey(destKey), rawValue(value)));
return createMono(setCommands -> setCommands.sMove(rawKey(sourceKey), rawKey(destKey), rawValue(value)));
}
@Override
@@ -113,7 +114,7 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.sCard(rawKey(key)));
return createMono(setCommands -> setCommands.sCard(rawKey(key)));
}
@Override
@@ -122,31 +123,29 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.sIsMember(rawKey(key), rawValue((V) o)));
return createMono(setCommands -> setCommands.sIsMember(rawKey(key), rawValue((V) o)));
}
@Override
@SuppressWarnings("unchecked")
public Mono<Map<Object, Boolean>> isMember(K key, Object... objects) {
Assert.notNull(key, "Key must not be null");
return createMono(connection -> {
return createMono(setCommands -> Flux.fromArray((V[]) objects) //
.map(this::rawValue) //
.collectList() //
.flatMap(rawValues -> setCommands.sMIsMember(rawKey(key), rawValues)) //
.map(result -> {
return Flux.fromArray((V[]) objects) //
.map(this::rawValue) //
.collectList() //
.flatMap(rawValues -> connection.sMIsMember(rawKey(key), rawValues)) //
.map(result -> {
Map<Object, Boolean> isMember = new LinkedHashMap<>(result.size());
Map<Object, Boolean> isMember = new LinkedHashMap<>(result.size());
for (int i = 0; i < objects.length; i++) {
isMember.put(objects[i], result.get(i));
}
for (int i = 0; i < objects.length; i++) {
isMember.put(objects[i], result.get(i));
}
return isMember;
});
});
return isMember;
}));
}
@Override
@@ -172,10 +171,10 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(keys, "Keys must not be null");
return createFlux(connection -> Flux.fromIterable(keys) //
return createFlux(setCommands -> Flux.fromIterable(keys) //
.map(this::rawKey) //
.collectList() //
.flatMapMany(connection::sInter) //
.flatMapMany(setCommands::sInter) //
.map(this::readValue));
}
@@ -205,10 +204,10 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(keys, "Keys must not be null");
Assert.notNull(destKey, "Destination key must not be null");
return createMono(connection -> Flux.fromIterable(keys) //
return createMono(setCommands -> Flux.fromIterable(keys) //
.map(this::rawKey) //
.collectList() //
.flatMap(rawKeys -> connection.sInterStore(rawKey(destKey), rawKeys)));
.flatMap(rawKeys -> setCommands.sInterStore(rawKey(destKey), rawKeys)));
}
@Override
@@ -234,10 +233,10 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(keys, "Keys must not be null");
return createFlux(connection -> Flux.fromIterable(keys) //
return createFlux(setCommands -> Flux.fromIterable(keys) //
.map(this::rawKey) //
.collectList() //
.flatMapMany(connection::sUnion) //
.flatMapMany(setCommands::sUnion) //
.map(this::readValue));
}
@@ -267,10 +266,10 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(keys, "Keys must not be null");
Assert.notNull(destKey, "Destination key must not be null");
return createMono(connection -> Flux.fromIterable(keys) //
return createMono(setCommands -> Flux.fromIterable(keys) //
.map(this::rawKey) //
.collectList() //
.flatMap(rawKeys -> connection.sUnionStore(rawKey(destKey), rawKeys)));
.flatMap(rawKeys -> setCommands.sUnionStore(rawKey(destKey), rawKeys)));
}
@Override
@@ -296,10 +295,10 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(keys, "Keys must not be null");
return createFlux(connection -> Flux.fromIterable(keys) //
return createFlux(setCommands -> Flux.fromIterable(keys) //
.map(this::rawKey) //
.collectList() //
.flatMapMany(connection::sDiff) //
.flatMapMany(setCommands::sDiff) //
.map(this::readValue));
}
@@ -329,10 +328,10 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(keys, "Keys must not be null");
Assert.notNull(destKey, "Destination key must not be null");
return createMono(connection -> Flux.fromIterable(keys) //
return createMono(setCommands -> Flux.fromIterable(keys) //
.map(this::rawKey) //
.collectList() //
.flatMap(rawKeys -> connection.sDiffStore(rawKey(destKey), rawKeys)));
.flatMap(rawKeys -> setCommands.sDiffStore(rawKey(destKey), rawKeys)));
}
@Override
@@ -340,7 +339,7 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(key, "Key must not be null");
return createFlux(connection -> connection.sMembers(rawKey(key)).map(this::readValue));
return createFlux(setCommands -> setCommands.sMembers(rawKey(key)).map(this::readValue));
}
@Override
@@ -349,7 +348,7 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(key, "Key must not be null");
Assert.notNull(options, "ScanOptions must not be null");
return createFlux(connection -> connection.sScan(rawKey(key), options).map(this::readValue));
return createFlux(setCommands -> setCommands.sScan(rawKey(key), options).map(this::readValue));
}
@Override
@@ -357,7 +356,7 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.sRandMember(rawKey(key)).map(this::readValue));
return createMono(setCommands -> setCommands.sRandMember(rawKey(key)).map(this::readValue));
}
@Override
@@ -365,7 +364,7 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.isTrue(count > 0, "Negative count not supported; Use randomMembers to allow duplicate elements");
return createFlux(connection -> connection.sRandMember(rawKey(key), count).map(this::readValue));
return createFlux(setCommands -> setCommands.sRandMember(rawKey(key), count).map(this::readValue));
}
@Override
@@ -373,7 +372,7 @@ class DefaultReactiveSetOperations<K, V> implements ReactiveSetOperations<K, V>
Assert.isTrue(count > 0, "Use a positive number for count; This method is already allowing duplicate elements");
return createFlux(connection -> connection.sRandMember(rawKey(key), -count).map(this::readValue));
return createFlux(setCommands -> setCommands.sRandMember(rawKey(key), -count).map(this::readValue));
}
@Override

View File

@@ -81,7 +81,7 @@ class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperat
if (objectMapper.isSimpleType(targetType) || ClassUtils.isAssignable(ByteBuffer.class, targetType)) {
return new HashMapper<Object, Object, Object>() {
return new HashMapper<>() {
@Override
public Map<Object, Object> toHash(Object object) {
@@ -101,7 +101,9 @@ class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperat
@Override
public Object fromHash(Map<Object, Object> hash) {
Object value = hash.values().iterator().next();
if (ClassUtils.isAssignableValue(targetType, value)) {
return value;
}
@@ -130,7 +132,7 @@ class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperat
Assert.notNull(recordIds, "MessageIds must not be null");
Assert.notEmpty(recordIds, "MessageIds must not be empty");
return createMono(connection -> connection.xAck(rawKey(key), group, recordIds));
return createMono(streamCommands -> streamCommands.xAck(rawKey(key), group, recordIds));
}
@Override
@@ -141,13 +143,13 @@ class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperat
MapRecord<K, HK, HV> input = StreamObjectMapper.toMapRecord(this, record);
return createMono(connection -> connection.xAdd(serializeRecord(input)));
return createMono(streamCommands -> streamCommands.xAdd(serializeRecord(input)));
}
@Override
public Flux<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) {
return createFlux(connection -> connection.xClaim(rawKey(key), consumerGroup, newOwner, xClaimOptions)
return createFlux(streamCommands -> streamCommands.xClaim(rawKey(key), consumerGroup, newOwner, xClaimOptions)
.map(this::deserializeRecord));
}
@@ -157,7 +159,7 @@ class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperat
Assert.notNull(key, "Key must not be null");
Assert.notNull(recordIds, "MessageIds must not be null");
return createMono(connection -> connection.xDel(rawKey(key), recordIds));
return createMono(streamCommands -> streamCommands.xDel(rawKey(key), recordIds));
}
@Override
@@ -167,7 +169,7 @@ class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperat
Assert.notNull(readOffset, "ReadOffset must not be null");
Assert.notNull(group, "Group must not be null");
return createMono(connection -> connection.xGroupCreate(rawKey(key), group, readOffset, true));
return createMono(streamCommands -> streamCommands.xGroupCreate(rawKey(key), group, readOffset, true));
}
@Override
@@ -176,7 +178,7 @@ class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperat
Assert.notNull(key, "Key must not be null");
Assert.notNull(consumer, "Consumer must not be null");
return createMono(connection -> connection.xGroupDelConsumer(rawKey(key), consumer));
return createMono(streamCommands -> streamCommands.xGroupDelConsumer(rawKey(key), consumer));
}
@Override
@@ -185,7 +187,7 @@ class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperat
Assert.notNull(key, "Key must not be null");
Assert.notNull(group, "Group must not be null");
return createMono(connection -> connection.xGroupDestroy(rawKey(key), group));
return createMono(streamCommands -> streamCommands.xGroupDestroy(rawKey(key), group));
}
@Override
@@ -194,7 +196,7 @@ class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperat
Assert.notNull(key, "Key must not be null");
Assert.notNull(group, "Group must not be null");
return createFlux(connection -> connection.xInfoConsumers(rawKey(key), group));
return createFlux(streamCommands -> streamCommands.xInfoConsumers(rawKey(key), group));
}
@Override
@@ -202,7 +204,7 @@ class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperat
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.xInfo(rawKey(key)));
return createMono(streamCommands -> streamCommands.xInfo(rawKey(key)));
}
@Override
@@ -210,28 +212,31 @@ class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperat
Assert.notNull(key, "Key must not be null");
return createFlux(connection -> connection.xInfoGroups(rawKey(key)));
return createFlux(streamCommands -> streamCommands.xInfoGroups(rawKey(key)));
}
@Override
public Mono<PendingMessages> pending(K key, String group, Range<?> range, long count) {
ByteBuffer rawKey = rawKey(key);
return createMono(connection -> connection.xPending(rawKey, group, range, count));
return createMono(streamCommands -> streamCommands.xPending(rawKey, group, range, count));
}
@Override
public Mono<PendingMessages> pending(K key, Consumer consumer, Range<?> range, long count) {
ByteBuffer rawKey = rawKey(key);
return createMono(connection -> connection.xPending(rawKey, consumer, range, count));
return createMono(streamCommands -> streamCommands.xPending(rawKey, consumer, range, count));
}
@Override
public Mono<PendingMessagesSummary> pending(K key, String group) {
ByteBuffer rawKey = rawKey(key);
return createMono(connection -> connection.xPending(rawKey, group));
return createMono(streamCommands -> streamCommands.xPending(rawKey, group));
}
@Override
@@ -239,7 +244,7 @@ class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperat
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.xLen(rawKey(key)));
return createMono(streamCommands -> streamCommands.xLen(rawKey(key)));
}
@Override
@@ -249,35 +254,39 @@ class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperat
Assert.notNull(range, "Range must not be null");
Assert.notNull(limit, "Limit must not be null");
return createFlux(connection -> connection.xRange(rawKey(key), range, limit).map(this::deserializeRecord));
return createFlux(streamCommands ->
streamCommands.xRange(rawKey(key), range, limit).map(this::deserializeRecord));
}
@Override
@SuppressWarnings("unchecked")
public Flux<MapRecord<K, HK, HV>> read(StreamReadOptions readOptions, StreamOffset<K>... streams) {
Assert.notNull(readOptions, "StreamReadOptions must not be null");
Assert.notNull(streams, "Streams must not be null");
return createFlux(connection -> {
return createFlux(streamCommands -> {
StreamOffset<ByteBuffer>[] streamOffsets = rawStreamOffsets(streams);
return connection.xRead(readOptions, streamOffsets).map(this::deserializeRecord);
return streamCommands.xRead(readOptions, streamOffsets).map(this::deserializeRecord);
});
}
@Override
public Flux<MapRecord<K, HK, HV>> read(Consumer consumer, StreamReadOptions readOptions, StreamOffset<K>... streams) {
@SuppressWarnings("unchecked")
public Flux<MapRecord<K, HK, HV>> read(Consumer consumer, StreamReadOptions readOptions,
StreamOffset<K>... streams) {
Assert.notNull(consumer, "Consumer must not be null");
Assert.notNull(readOptions, "StreamReadOptions must not be null");
Assert.notNull(streams, "Streams must not be null");
return createFlux(connection -> {
return createFlux(streamCommands -> {
StreamOffset<ByteBuffer>[] streamOffsets = rawStreamOffsets(streams);
return connection.xReadGroup(consumer, readOptions, streamOffsets).map(this::deserializeRecord);
return streamCommands.xReadGroup(consumer, readOptions, streamOffsets).map(this::deserializeRecord);
});
}
@@ -288,7 +297,8 @@ class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperat
Assert.notNull(range, "Range must not be null");
Assert.notNull(limit, "Limit must not be null");
return createFlux(connection -> connection.xRevRange(rawKey(key), range, limit).map(this::deserializeRecord));
return createFlux(streamCommands ->
streamCommands.xRevRange(rawKey(key), range, limit).map(this::deserializeRecord));
}
@Override
@@ -300,7 +310,7 @@ class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperat
public Mono<Long> trim(K key, long count, boolean approximateTrimming) {
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.xTrim(rawKey(key), count, approximateTrimming));
return createMono(streamCommands -> streamCommands.xTrim(rawKey(key), count, approximateTrimming));
}
@Override
@@ -333,20 +343,20 @@ class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperat
return serializationContext.getKeySerializationPair().write(key);
}
@SuppressWarnings("unchecked")
private ByteBuffer rawHashKey(HK key) {
try {
return serializationContext.getHashKeySerializationPair().write(key);
} catch (IllegalStateException e) {}
} catch (IllegalStateException ignore) {}
return ByteBuffer.wrap(objectMapper.getConversionService().convert(key, byte[].class));
}
@SuppressWarnings("unchecked")
private ByteBuffer rawValue(HV value) {
try {
return serializationContext.getHashValueSerializationPair().write(value);
} catch (IllegalStateException e) {}
} catch (IllegalStateException ignore) {}
return ByteBuffer.wrap(objectMapper.getConversionService().convert(value, byte[].class));
}

View File

@@ -60,7 +60,7 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.set(rawKey(key), rawValue(value)));
return createMono(stringCommands -> stringCommands.set(rawKey(key), rawValue(value)));
}
@Override
@@ -69,8 +69,8 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
Assert.notNull(timeout, "Duration must not be null");
return createMono(
connection -> connection.set(rawKey(key), rawValue(value), Expiration.from(timeout), SetOption.UPSERT));
return createMono(stringCommands ->
stringCommands.set(rawKey(key), rawValue(value), Expiration.from(timeout), SetOption.UPSERT));
}
@Override
@@ -78,8 +78,8 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
return createMono(
connection -> connection.set(rawKey(key), rawValue(value), Expiration.persistent(), SetOption.SET_IF_ABSENT));
return createMono(stringCommands ->
stringCommands.set(rawKey(key), rawValue(value), Expiration.persistent(), SetOption.SET_IF_ABSENT));
}
@Override
@@ -88,8 +88,8 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
Assert.notNull(timeout, "Duration must not be null");
return createMono(
connection -> connection.set(rawKey(key), rawValue(value), Expiration.from(timeout), SetOption.SET_IF_ABSENT));
return createMono(stringCommands ->
stringCommands.set(rawKey(key), rawValue(value), Expiration.from(timeout), SetOption.SET_IF_ABSENT));
}
@Override
@@ -97,8 +97,8 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
return createMono(
connection -> connection.set(rawKey(key), rawValue(value), Expiration.persistent(), SetOption.SET_IF_PRESENT));
return createMono(stringCommands ->
stringCommands.set(rawKey(key), rawValue(value), Expiration.persistent(), SetOption.SET_IF_PRESENT));
}
@Override
@@ -107,8 +107,8 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
Assert.notNull(timeout, "Duration must not be null");
return createMono(
connection -> connection.set(rawKey(key), rawValue(value), Expiration.from(timeout), SetOption.SET_IF_PRESENT));
return createMono(stringCommands ->
stringCommands.set(rawKey(key), rawValue(value), Expiration.from(timeout), SetOption.SET_IF_PRESENT));
}
@Override
@@ -116,12 +116,12 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(map, "Map must not be null");
return createMono(connection -> {
return createMono(stringCommands -> {
Mono<Map<ByteBuffer, ByteBuffer>> serializedMap = Flux.fromIterable(() -> map.entrySet().iterator())
.collectMap(entry -> rawKey(entry.getKey()), entry -> rawValue(entry.getValue()));
return serializedMap.flatMap(connection::mSet);
return serializedMap.flatMap(stringCommands::mSet);
});
}
@@ -130,12 +130,12 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(map, "Map must not be null");
return createMono(connection -> {
return createMono(stringCommands -> {
Mono<Map<ByteBuffer, ByteBuffer>> serializedMap = Flux.fromIterable(() -> map.entrySet().iterator())
.collectMap(entry -> rawKey(entry.getKey()), entry -> rawValue(entry.getValue()));
return serializedMap.flatMap(connection::mSetNX);
return serializedMap.flatMap(stringCommands::mSetNX);
});
}
@@ -145,7 +145,7 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.get(rawKey((K) key)) //
return createMono(stringCommands -> stringCommands.get(rawKey((K) key)) //
.map(this::readValue));
}
@@ -154,7 +154,7 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.getDel(rawKey(key)) //
return createMono(stringCommands -> stringCommands.getDel(rawKey(key)) //
.map(this::readValue));
}
@@ -164,7 +164,7 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
Assert.notNull(timeout, "Timeout must not be null");
return createMono(connection -> connection.getEx(rawKey(key), Expiration.from(timeout)) //
return createMono(stringCommands -> stringCommands.getEx(rawKey(key), Expiration.from(timeout)) //
.map(this::readValue));
}
@@ -173,7 +173,7 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.getEx(rawKey(key), Expiration.persistent()) //
return createMono(stringCommands -> stringCommands.getEx(rawKey(key), Expiration.persistent()) //
.map(this::readValue));
}
@@ -182,7 +182,7 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.getSet(rawKey(key), rawValue(value)).map(value()::read));
return createMono(stringCommands -> stringCommands.getSet(rawKey(key), rawValue(value)).map(value()::read));
}
@Override
@@ -190,8 +190,9 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(keys, "Keys must not be null");
return createMono(connection -> Flux.fromIterable(keys).map(key()::write).collectList().flatMap(connection::mGet)
.map(this::deserializeValues));
return createMono(stringCommands ->
Flux.fromIterable(keys).map(key()::write).collectList().flatMap(stringCommands::mGet)
.map(this::deserializeValues));
}
@Override
@@ -240,8 +241,8 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
Assert.notNull(value, "Value must not be null");
return createMono(
connection -> connection.append(rawKey(key), serializationContext.getStringSerializationPair().write(value)));
return createMono(stringCommands ->
stringCommands.append(rawKey(key), serializationContext.getStringSerializationPair().write(value)));
}
@Override
@@ -249,7 +250,7 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.getRange(rawKey(key), start, end) //
return createMono(stringCommands -> stringCommands.getRange(rawKey(key), start, end) //
.map(stringSerializationPair()::read));
}
@@ -258,7 +259,7 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.setRange(rawKey(key), rawValue(value), offset));
return createMono(stringCommands -> stringCommands.setRange(rawKey(key), rawValue(value), offset));
}
@Override
@@ -266,7 +267,7 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.strLen(rawKey(key)));
return createMono(stringCommands -> stringCommands.strLen(rawKey(key)));
}
@Override
@@ -274,7 +275,7 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.setBit(rawKey(key), offset, value));
return createMono(stringCommands -> stringCommands.setBit(rawKey(key), offset, value));
}
@Override
@@ -282,7 +283,7 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.getBit(rawKey(key), offset));
return createMono(stringCommands -> stringCommands.getBit(rawKey(key), offset));
}
@Override
@@ -291,7 +292,7 @@ class DefaultReactiveValueOperations<K, V> implements ReactiveValueOperations<K,
Assert.notNull(key, "Key must not be null");
Assert.notNull(subCommands, "BitFieldSubCommands must not be null");
return createMono(connection -> connection.bitField(rawKey(key), subCommands));
return createMono(stringCommands -> stringCommands.bitField(rawKey(key), subCommands));
}
@Override

View File

@@ -65,7 +65,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.zAdd(rawKey(key), score, rawValue(value)).map(l -> l != 0));
return createMono(zSetCommands -> zSetCommands.zAdd(rawKey(key), score, rawValue(value)).map(l -> l != 0));
}
@Override
@@ -74,10 +74,10 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(tuples, "Key must not be null");
return createMono(connection -> Flux.fromIterable(tuples) //
return createMono(zSetCommands -> Flux.fromIterable(tuples) //
.map(t -> new DefaultTuple(ByteUtils.getBytes(rawValue(t.getValue())), t.getScore())) //
.collectList() //
.flatMap(serialized -> connection.zAdd(rawKey(key), serialized)));
.flatMap(serialized -> zSetCommands.zAdd(rawKey(key), serialized)));
}
@Override
@@ -88,13 +88,13 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(values, "Values must not be null");
if (values.length == 1) {
return createMono(connection -> connection.zRem(rawKey(key), rawValue((V) values[0])));
return createMono(zSetCommands -> zSetCommands.zRem(rawKey(key), rawValue((V) values[0])));
}
return createMono(connection -> Flux.fromArray((V[]) values) //
return createMono(zSetCommands -> Flux.fromArray((V[]) values) //
.map(this::rawValue) //
.collectList() //
.flatMap(serialized -> connection.zRem(rawKey(key), serialized)));
.flatMap(serialized -> zSetCommands.zRem(rawKey(key), serialized)));
}
@Override
@@ -102,7 +102,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.zIncrBy(rawKey(key), delta, rawValue(value)));
return createMono(zSetCommands -> zSetCommands.zIncrBy(rawKey(key), delta, rawValue(value)));
}
@Override
@@ -110,7 +110,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.zRandMember(rawKey(key))).map(this::readValue);
return createMono(zSetCommands -> zSetCommands.zRandMember(rawKey(key))).map(this::readValue);
}
@Override
@@ -119,7 +119,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.isTrue(count > 0, "Negative count not supported; Use randomMembers to allow duplicate elements");
return createFlux(connection -> connection.zRandMember(rawKey(key), count)).map(this::readValue);
return createFlux(zSetCommands -> zSetCommands.zRandMember(rawKey(key), count)).map(this::readValue);
}
@Override
@@ -128,7 +128,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.isTrue(count > 0, "Use a positive number for count; This method is already allowing duplicate elements");
return createFlux(connection -> connection.zRandMember(rawKey(key), -count)).map(this::readValue);
return createFlux(zSetCommands -> zSetCommands.zRandMember(rawKey(key), -count)).map(this::readValue);
}
@Override
@@ -136,7 +136,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.zRandMemberWithScore(rawKey(key))).map(this::readTypedTuple);
return createMono(zSetCommands -> zSetCommands.zRandMemberWithScore(rawKey(key))).map(this::readTypedTuple);
}
@Override
@@ -145,7 +145,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.isTrue(count > 0, "Negative count not supported; Use randomMembers to allow duplicate elements");
return createFlux(connection -> connection.zRandMemberWithScore(rawKey(key), count)).map(this::readTypedTuple);
return createFlux(zSetCommands -> zSetCommands.zRandMemberWithScore(rawKey(key), count)).map(this::readTypedTuple);
}
@Override
@@ -154,7 +154,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.isTrue(count > 0, "Use a positive number for count; This method is already allowing duplicate elements");
return createFlux(connection -> connection.zRandMemberWithScore(rawKey(key), -count)).map(this::readTypedTuple);
return createFlux(zSetCommands -> zSetCommands.zRandMemberWithScore(rawKey(key), -count)).map(this::readTypedTuple);
}
@Override
@@ -163,7 +163,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.zRank(rawKey(key), rawValue((V) o)));
return createMono(zSetCommands -> zSetCommands.zRank(rawKey(key), rawValue((V) o)));
}
@Override
@@ -172,7 +172,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.zRevRank(rawKey(key), rawValue((V) o)));
return createMono(zSetCommands -> zSetCommands.zRevRank(rawKey(key), rawValue((V) o)));
}
@Override
@@ -181,7 +181,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(connection -> connection.zRange(rawKey(key), range).map(this::readValue));
return createFlux(zSetCommands -> zSetCommands.zRange(rawKey(key), range).map(this::readValue));
}
@Override
@@ -190,7 +190,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(connection -> connection.zRangeWithScores(rawKey(key), range).map(this::readTypedTuple));
return createFlux(zSetCommands -> zSetCommands.zRangeWithScores(rawKey(key), range).map(this::readTypedTuple));
}
@Override
@@ -199,7 +199,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(connection -> connection.zRangeByScore(rawKey(key), range).map(this::readValue));
return createFlux(zSetCommands -> zSetCommands.zRangeByScore(rawKey(key), range).map(this::readValue));
}
@Override
@@ -208,7 +208,8 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(connection -> connection.zRangeByScoreWithScores(rawKey(key), range).map(this::readTypedTuple));
return createFlux(zSetCommands ->
zSetCommands.zRangeByScoreWithScores(rawKey(key), range).map(this::readTypedTuple));
}
@Override
@@ -217,7 +218,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(connection -> connection.zRangeByScore(rawKey(key), range, limit).map(this::readValue));
return createFlux(zSetCommands -> zSetCommands.zRangeByScore(rawKey(key), range, limit).map(this::readValue));
}
@Override
@@ -227,8 +228,8 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(range, "Range must not be null");
Assert.notNull(limit, "Limit must not be null");
return createFlux(
connection -> connection.zRangeByScoreWithScores(rawKey(key), range, limit).map(this::readTypedTuple));
return createFlux(zSetCommands ->
zSetCommands.zRangeByScoreWithScores(rawKey(key), range, limit).map(this::readTypedTuple));
}
@Override
@@ -237,7 +238,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(connection -> connection.zRevRange(rawKey(key), range).map(this::readValue));
return createFlux(zSetCommands -> zSetCommands.zRevRange(rawKey(key), range).map(this::readValue));
}
@Override
@@ -246,7 +247,8 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(connection -> connection.zRevRangeWithScores(rawKey(key), range).map(this::readTypedTuple));
return createFlux(zSetCommands ->
zSetCommands.zRevRangeWithScores(rawKey(key), range).map(this::readTypedTuple));
}
@Override
@@ -255,7 +257,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(connection -> connection.zRevRangeByScore(rawKey(key), range).map(this::readValue));
return createFlux(zSetCommands -> zSetCommands.zRevRangeByScore(rawKey(key), range).map(this::readValue));
}
@Override
@@ -264,8 +266,8 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(
connection -> connection.zRevRangeByScoreWithScores(rawKey(key), range).map(this::readTypedTuple));
return createFlux(zSetCommands ->
zSetCommands.zRevRangeByScoreWithScores(rawKey(key), range).map(this::readTypedTuple));
}
@Override
@@ -274,7 +276,8 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(connection -> connection.zRevRangeByScore(rawKey(key), range, limit).map(this::readValue));
return createFlux(zSetCommands ->
zSetCommands.zRevRangeByScore(rawKey(key), range, limit).map(this::readValue));
}
@Override
@@ -284,8 +287,8 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(range, "Range must not be null");
Assert.notNull(limit, "Limit must not be null");
return createFlux(
connection -> connection.zRevRangeByScoreWithScores(rawKey(key), range, limit).map(this::readTypedTuple));
return createFlux(zSetCommands ->
zSetCommands.zRevRangeByScoreWithScores(rawKey(key), range, limit).map(this::readTypedTuple));
}
@Override
@@ -296,7 +299,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(range, "Range must not be null");
Assert.notNull(limit, "Limit must not be null");
return createMono(connection -> connection.zRangeStoreByLex(rawKey(srcKey), rawKey(dstKey), range, limit));
return createMono(zSetCommands -> zSetCommands.zRangeStoreByLex(rawKey(srcKey), rawKey(dstKey), range, limit));
}
@Override
@@ -307,7 +310,8 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(range, "Range must not be null");
Assert.notNull(limit, "Limit must not be null");
return createMono(connection -> connection.zRangeStoreRevByLex(rawKey(srcKey), rawKey(dstKey), range, limit));
return createMono(zSetCommands ->
zSetCommands.zRangeStoreRevByLex(rawKey(srcKey), rawKey(dstKey), range, limit));
}
@Override
@@ -318,7 +322,8 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(range, "Range must not be null");
Assert.notNull(limit, "Limit must not be null");
return createMono(connection -> connection.zRangeStoreByScore(rawKey(srcKey), rawKey(dstKey), range, limit));
return createMono(zSetCommands ->
zSetCommands.zRangeStoreByScore(rawKey(srcKey), rawKey(dstKey), range, limit));
}
@Override
@@ -329,7 +334,8 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(range, "Range must not be null");
Assert.notNull(limit, "Limit must not be null");
return createMono(connection -> connection.zRangeStoreRevByScore(rawKey(srcKey), rawKey(dstKey), range, limit));
return createMono(zSetCommands ->
zSetCommands.zRangeStoreRevByScore(rawKey(srcKey), rawKey(dstKey), range, limit));
}
@Override
@@ -338,7 +344,8 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(options, "ScanOptions must not be null");
return createFlux(connection -> connection.zScan(rawKey(key), options).map(this::readTypedTuple));
return createFlux(zSetCommands ->
zSetCommands.zScan(rawKey(key), options).map(this::readTypedTuple));
}
@Override
@@ -347,7 +354,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createMono(connection -> connection.zCount(rawKey(key), range));
return createMono(zSetCommands -> zSetCommands.zCount(rawKey(key), range));
}
@Override
@@ -356,7 +363,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createMono(connection -> connection.zLexCount(rawKey(key), range));
return createMono(zSetCommands -> zSetCommands.zLexCount(rawKey(key), range));
}
@Override
@@ -364,7 +371,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.zPopMin(rawKey(key)).map(this::readTypedTuple));
return createMono(zSetCommands -> zSetCommands.zPopMin(rawKey(key)).map(this::readTypedTuple));
}
@Override
@@ -372,7 +379,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
return createFlux(connection -> connection.zPopMin(rawKey(key), count).map(this::readTypedTuple));
return createFlux(zSetCommands -> zSetCommands.zPopMin(rawKey(key), count).map(this::readTypedTuple));
}
@Override
@@ -381,7 +388,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(timeout, "Timeout must not be null");
return createMono(connection -> connection.bZPopMin(rawKey(key), timeout).map(this::readTypedTuple));
return createMono(zSetCommands -> zSetCommands.bZPopMin(rawKey(key), timeout).map(this::readTypedTuple));
}
@Override
@@ -389,7 +396,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.zPopMax(rawKey(key)).map(this::readTypedTuple));
return createMono(zSetCommands -> zSetCommands.zPopMax(rawKey(key)).map(this::readTypedTuple));
}
@Override
@@ -397,7 +404,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
return createFlux(connection -> connection.zPopMax(rawKey(key), count).map(this::readTypedTuple));
return createFlux(zSetCommands -> zSetCommands.zPopMax(rawKey(key), count).map(this::readTypedTuple));
}
@Override
@@ -406,7 +413,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(timeout, "Timeout must not be null");
return createMono(connection -> connection.bZPopMax(rawKey(key), timeout).map(this::readTypedTuple));
return createMono(zSetCommands -> zSetCommands.bZPopMax(rawKey(key), timeout).map(this::readTypedTuple));
}
@Override
@@ -414,7 +421,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.zCard(rawKey(key)));
return createMono(zSetCommands -> zSetCommands.zCard(rawKey(key)));
}
@Override
@@ -423,7 +430,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> connection.zScore(rawKey(key), rawValue((V) o)));
return createMono(zSetCommands -> zSetCommands.zScore(rawKey(key), rawValue((V) o)));
}
@Override
@@ -432,10 +439,10 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
return createMono(connection -> Flux.fromArray((V[]) o) //
return createMono(zSetCommands -> Flux.fromArray((V[]) o) //
.map(this::rawValue) //
.collectList() //
.flatMap(values -> connection.zMScore(rawKey(key), values)));
.flatMap(values -> zSetCommands.zMScore(rawKey(key), values)));
}
@Override
@@ -444,7 +451,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createMono(connection -> connection.zRemRangeByRank(rawKey(key), range));
return createMono(zSetCommands -> zSetCommands.zRemRangeByRank(rawKey(key), range));
}
@Override
@@ -453,7 +460,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createMono(connection -> connection.zRemRangeByLex(rawKey(key), range));
return createMono(zSetCommands -> zSetCommands.zRemRangeByLex(rawKey(key), range));
}
@Override
@@ -462,7 +469,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createMono(connection -> connection.zRemRangeByScore(rawKey(key), range));
return createMono(zSetCommands -> zSetCommands.zRemRangeByScore(rawKey(key), range));
}
@Override
@@ -471,10 +478,10 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(otherKeys, "Other keys must not be null");
return createFlux(connection -> Flux.fromIterable(getKeys(key, otherKeys)) //
return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) //
.map(this::rawKey) //
.collectList() //
.flatMapMany(connection::zDiff).map(this::readValue));
.flatMapMany(zSetCommands::zDiff).map(this::readValue));
}
@Override
@@ -483,10 +490,10 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(otherKeys, "Other keys must not be null");
return createFlux(connection -> Flux.fromIterable(getKeys(key, otherKeys)) //
return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) //
.map(this::rawKey) //
.collectList() //
.flatMapMany(connection::zDiffWithScores).map(this::readTypedTuple));
.flatMapMany(zSetCommands::zDiffWithScores).map(this::readTypedTuple));
}
@Override
@@ -496,10 +503,10 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(otherKeys, "Other keys must not be null");
Assert.notNull(destKey, "Destination key must not be null");
return createMono(connection -> Flux.fromIterable(getKeys(key, otherKeys)) //
return createMono(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) //
.map(this::rawKey) //
.collectList() //
.flatMap(serialized -> connection.zDiffStore(rawKey(destKey), serialized)));
.flatMap(serialized -> zSetCommands.zDiffStore(rawKey(destKey), serialized)));
}
@@ -509,10 +516,10 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(otherKeys, "Other keys must not be null");
return createFlux(connection -> Flux.fromIterable(getKeys(key, otherKeys)) //
return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) //
.map(this::rawKey) //
.collectList() //
.flatMapMany(connection::zInter).map(this::readValue));
.flatMapMany(zSetCommands::zInter).map(this::readValue));
}
@Override
@@ -521,10 +528,10 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(otherKeys, "Other keys must not be null");
return createFlux(connection -> Flux.fromIterable(getKeys(key, otherKeys)) //
return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) //
.map(this::rawKey) //
.collectList() //
.flatMapMany(connection::zInterWithScores).map(this::readTypedTuple));
.flatMapMany(zSetCommands::zInterWithScores).map(this::readTypedTuple));
}
@Override
@@ -537,10 +544,10 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(aggregate, "Aggregate must not be null");
Assert.notNull(weights, "Weights must not be null");
return createFlux(connection -> Flux.fromIterable(getKeys(key, otherKeys)) //
return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) //
.map(this::rawKey) //
.collectList() //
.flatMapMany(sets -> connection.zInterWithScores(sets, weights, aggregate)).map(this::readTypedTuple));
.flatMapMany(sets -> zSetCommands.zInterWithScores(sets, weights, aggregate)).map(this::readTypedTuple));
}
@Override
@@ -550,10 +557,10 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(otherKeys, "Other keys must not be null");
Assert.notNull(destKey, "Destination key must not be null");
return createMono(connection -> Flux.fromIterable(getKeys(key, otherKeys)) //
return createMono(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) //
.map(this::rawKey) //
.collectList() //
.flatMap(serialized -> connection.zInterStore(rawKey(destKey), serialized)));
.flatMap(serialized -> zSetCommands.zInterStore(rawKey(destKey), serialized)));
}
@Override
@@ -565,10 +572,10 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(aggregate, "Aggregate must not be null");
Assert.notNull(weights, "Weights must not be null");
return createMono(connection -> Flux.fromIterable(getKeys(key, otherKeys)) //
return createMono(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) //
.map(this::rawKey) //
.collectList() //
.flatMap(serialized -> connection.zInterStore(rawKey(destKey), serialized, weights, aggregate)));
.flatMap(serialized -> zSetCommands.zInterStore(rawKey(destKey), serialized, weights, aggregate)));
}
@Override
@@ -577,10 +584,10 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(otherKeys, "Other keys must not be null");
return createFlux(connection -> Flux.fromIterable(getKeys(key, otherKeys)) //
return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) //
.map(this::rawKey) //
.collectList() //
.flatMapMany(connection::zUnion).map(this::readValue));
.flatMapMany(zSetCommands::zUnion).map(this::readValue));
}
@Override
@@ -589,10 +596,10 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(otherKeys, "Other keys must not be null");
return createFlux(connection -> Flux.fromIterable(getKeys(key, otherKeys)) //
return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) //
.map(this::rawKey) //
.collectList() //
.flatMapMany(connection::zUnionWithScores).map(this::readTypedTuple));
.flatMapMany(zSetCommands::zUnionWithScores).map(this::readTypedTuple));
}
@Override
@@ -603,10 +610,10 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(aggregate, "Aggregate must not be null");
Assert.notNull(weights, "Weights must not be null");
return createFlux(connection -> Flux.fromIterable(getKeys(key, otherKeys)) //
return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) //
.map(this::rawKey) //
.collectList() //
.flatMapMany(sets -> connection.zUnionWithScores(sets, weights, aggregate)).map(this::readTypedTuple));
.flatMapMany(sets -> zSetCommands.zUnionWithScores(sets, weights, aggregate)).map(this::readTypedTuple));
}
@Override
@@ -626,10 +633,10 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(otherKeys, "Other keys must not be null");
Assert.notNull(destKey, "Destination key must not be null");
return createMono(connection -> Flux.fromIterable(getKeys(key, otherKeys)) //
return createMono(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) //
.map(this::rawKey) //
.collectList() //
.flatMap(serialized -> connection.zUnionStore(rawKey(destKey), serialized)));
.flatMap(serialized -> zSetCommands.zUnionStore(rawKey(destKey), serialized)));
}
@Override
@@ -641,10 +648,10 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(aggregate, "Aggregate must not be null");
Assert.notNull(weights, "Weights must not be null");
return createMono(connection -> Flux.fromIterable(getKeys(key, otherKeys)) //
return createMono(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) //
.map(this::rawKey) //
.collectList() //
.flatMap(serialized -> connection.zUnionStore(rawKey(destKey), serialized, weights, aggregate)));
.flatMap(serialized -> zSetCommands.zUnionStore(rawKey(destKey), serialized, weights, aggregate)));
}
@Override
@@ -653,7 +660,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(connection -> connection.zRangeByLex(rawKey(key), range).map(this::readValue));
return createFlux(zSetCommands -> zSetCommands.zRangeByLex(rawKey(key), range).map(this::readValue));
}
@Override
@@ -663,7 +670,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(range, "Range must not be null");
Assert.notNull(limit, "Limit must not be null");
return createFlux(connection -> connection.zRangeByLex(rawKey(key), range, limit).map(this::readValue));
return createFlux(zSetCommands -> zSetCommands.zRangeByLex(rawKey(key), range, limit).map(this::readValue));
}
@Override
@@ -672,7 +679,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(key, "Key must not be null");
Assert.notNull(range, "Range must not be null");
return createFlux(connection -> connection.zRevRangeByLex(rawKey(key), range).map(this::readValue));
return createFlux(zSetCommands -> zSetCommands.zRevRangeByLex(rawKey(key), range).map(this::readValue));
}
@Override
@@ -682,7 +689,7 @@ class DefaultReactiveZSetOperations<K, V> implements ReactiveZSetOperations<K, V
Assert.notNull(range, "Range must not be null");
Assert.notNull(limit, "Limit must not be null");
return createFlux(connection -> connection.zRevRangeByLex(rawKey(key), range, limit).map(this::readValue));
return createFlux(zSetCommands -> zSetCommands.zRevRangeByLex(rawKey(key), range, limit).map(this::readValue));
}
@Override