Upgrade to Jedis 5.0
Adapt to API changes in the Jedis 5.0 driver. Fix bzPopMaxShouldWorkCorrectly() and bzPopMinShouldWorkCorrectly() tests in JedisClusterConnectionTests. Jedis 5.0 changed the bzpopmax and bzpopmin Redis commands to no longer return an empty (Array)List internally when evaluating and popping from an empty sorted set. A NullPointerException will be thrown if either bzpopmax or bzpopmin commands are executd on an empty Redis sorted set in Jedis 5.0 (vs. Jedis 4.x): Closes #2612 Original pull request: #2716
This commit is contained in:
committed by
Christoph Strobl
parent
d63fd19519
commit
2075633809
@@ -30,13 +30,15 @@ import org.springframework.data.redis.core.Cursor;
|
||||
import org.springframework.data.redis.core.ScanCursor;
|
||||
import org.springframework.data.redis.core.ScanIteration;
|
||||
import org.springframework.data.redis.core.ScanOptions;
|
||||
import org.springframework.data.util.Streamable;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* Cluster {@link RedisHashCommands} implementation for Jedis.
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @author Mark Paluch
|
||||
* @author John Blum
|
||||
* @since 2.0
|
||||
*/
|
||||
class JedisClusterHashCommands implements RedisHashCommands {
|
||||
@@ -160,8 +162,8 @@ class JedisClusterHashCommands implements RedisHashCommands {
|
||||
Assert.notNull(key, "Key must not be null");
|
||||
|
||||
try {
|
||||
Map<byte[], byte[]> map = connection.getCluster().hrandfieldWithValues(key, 1);
|
||||
return map.isEmpty() ? null : map.entrySet().iterator().next();
|
||||
List<Entry<byte[], byte[]>> mapEntryList = connection.getCluster().hrandfieldWithValues(key, 1);
|
||||
return mapEntryList.isEmpty() ? null : mapEntryList.get(0);
|
||||
} catch (Exception ex) {
|
||||
throw convertJedisAccessException(ex);
|
||||
}
|
||||
@@ -185,8 +187,7 @@ class JedisClusterHashCommands implements RedisHashCommands {
|
||||
public List<Entry<byte[], byte[]>> hRandFieldWithValues(byte[] key, long count) {
|
||||
|
||||
try {
|
||||
Map<byte[], byte[]> map = connection.getCluster().hrandfieldWithValues(key, count);
|
||||
return Streamable.of(() -> map.entrySet().iterator()).toList();
|
||||
return connection.getCluster().hrandfieldWithValues(key, count);
|
||||
} catch (Exception ex) {
|
||||
throw convertJedisAccessException(ex);
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@@ -249,21 +250,26 @@ class JedisClusterServerCommands implements RedisClusterServerCommands {
|
||||
|
||||
Assert.notNull(pattern, "Pattern must not be null");
|
||||
|
||||
List<NodeResult<List<String>>> mapResult = connection.getClusterCommandExecutor()
|
||||
.executeCommandOnAllNodes((JedisClusterCommandCallback<List<String>>) client -> client.configGet(pattern))
|
||||
JedisClusterCommandCallback<Map<String, String>> command = jedis -> jedis.configGet(pattern);
|
||||
|
||||
List<NodeResult<Map<String, String>>> nodeResults = connection.getClusterCommandExecutor()
|
||||
.executeCommandOnAllNodes(command)
|
||||
.getResults();
|
||||
|
||||
List<String> result = new ArrayList<>();
|
||||
for (NodeResult<List<String>> entry : mapResult) {
|
||||
Properties nodesConfiguration = new Properties();
|
||||
|
||||
String prefix = entry.getNode().asString();
|
||||
int i = 0;
|
||||
for (String value : entry.getValue()) {
|
||||
result.add((i++ % 2 == 0 ? (prefix + ".") : "") + value);
|
||||
for (NodeResult<Map<String, String>> nodeResult : nodeResults) {
|
||||
|
||||
String prefix = nodeResult.getNode().asString();
|
||||
|
||||
for (Entry<String, String> entry : nodeResult.getValue().entrySet()) {
|
||||
String newKey = prefix.concat(".").concat(entry.getKey());
|
||||
String value = entry.getValue();
|
||||
nodesConfiguration.setProperty(newKey, value);
|
||||
}
|
||||
}
|
||||
|
||||
return Converters.toProperties(result);
|
||||
return nodesConfiguration;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -17,9 +17,11 @@ package org.springframework.data.redis.connection.jedis;
|
||||
|
||||
import static org.springframework.data.redis.connection.jedis.StreamConverters.*;
|
||||
|
||||
import org.springframework.util.StringUtils;
|
||||
import redis.clients.jedis.BuilderFactory;
|
||||
import redis.clients.jedis.params.XAddParams;
|
||||
import redis.clients.jedis.params.XClaimParams;
|
||||
import redis.clients.jedis.params.XPendingParams;
|
||||
import redis.clients.jedis.params.XReadGroupParams;
|
||||
import redis.clients.jedis.params.XReadParams;
|
||||
|
||||
@@ -269,9 +271,19 @@ class JedisClusterStreamCommands implements RedisStreamCommands {
|
||||
|
||||
try {
|
||||
|
||||
List<Object> response = connection.getCluster().xpending(key, group,
|
||||
JedisConverters.toBytes(getLowerValue(range)), JedisConverters.toBytes(getUpperValue(range)),
|
||||
options.getCount().intValue(), JedisConverters.toBytes(options.getConsumerName()));
|
||||
@SuppressWarnings("all")
|
||||
XPendingParams pendingParams = new XPendingParams(
|
||||
JedisConverters.toBytes(StreamConverters.getLowerValue(range)),
|
||||
JedisConverters.toBytes(StreamConverters.getUpperValue(range)),
|
||||
options.getCount().intValue());
|
||||
|
||||
String consumerName = options.getConsumerName();
|
||||
|
||||
if (StringUtils.hasText(consumerName)) {
|
||||
pendingParams = pendingParams.consumer(consumerName);
|
||||
}
|
||||
|
||||
List<Object> response = connection.getCluster().xpending(key, group, pendingParams);
|
||||
|
||||
return StreamConverters.toPendingMessages(groupName, range,
|
||||
BuilderFactory.STREAM_PENDING_ENTRY_LIST.build(response));
|
||||
|
||||
@@ -20,6 +20,7 @@ import redis.clients.jedis.params.ScanParams;
|
||||
import redis.clients.jedis.params.ZParams;
|
||||
import redis.clients.jedis.params.ZRangeParams;
|
||||
import redis.clients.jedis.resps.ScanResult;
|
||||
import redis.clients.jedis.util.KeyValue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashSet;
|
||||
@@ -34,7 +35,6 @@ import org.springframework.data.redis.connection.ClusterSlotHashUtil;
|
||||
import org.springframework.data.redis.connection.RedisZSetCommands;
|
||||
import org.springframework.data.redis.connection.convert.SetConverter;
|
||||
import org.springframework.data.redis.connection.zset.Aggregate;
|
||||
import org.springframework.data.redis.connection.zset.DefaultTuple;
|
||||
import org.springframework.data.redis.connection.zset.Tuple;
|
||||
import org.springframework.data.redis.connection.zset.Weights;
|
||||
import org.springframework.data.redis.core.Cursor;
|
||||
@@ -46,18 +46,22 @@ import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* Cluster {@link RedisZSetCommands} implementation for Jedis.
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @author Mark Paluch
|
||||
* @author Clement Ong
|
||||
* @author Andrey Shlykov
|
||||
* @author Jens Deppe
|
||||
* @author Shyngys Sapraliyev
|
||||
* @author John Blum
|
||||
* @since 2.0
|
||||
*/
|
||||
class JedisClusterZSetCommands implements RedisZSetCommands {
|
||||
|
||||
private static final SetConverter<redis.clients.jedis.resps.Tuple, Tuple> TUPLE_SET_CONVERTER = new SetConverter<>(
|
||||
JedisConverters::toTuple);
|
||||
private static final SetConverter<redis.clients.jedis.resps.Tuple, Tuple> TUPLE_SET_CONVERTER =
|
||||
new SetConverter<>(JedisConverters::toTuple);
|
||||
|
||||
private final JedisClusterConnection connection;
|
||||
|
||||
JedisClusterZSetCommands(JedisClusterConnection connection) {
|
||||
@@ -818,7 +822,7 @@ class JedisClusterZSetCommands implements RedisZSetCommands {
|
||||
if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) {
|
||||
|
||||
try {
|
||||
return connection.getCluster().zdiff(sets);
|
||||
return JedisConverters.toSet(connection.getCluster().zdiff(sets));
|
||||
} catch (Exception ex) {
|
||||
throw convertJedisAccessException(ex);
|
||||
}
|
||||
@@ -835,7 +839,7 @@ class JedisClusterZSetCommands implements RedisZSetCommands {
|
||||
if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) {
|
||||
|
||||
try {
|
||||
return JedisConverters.toTupleSet(connection.getCluster().zdiffWithScores(sets));
|
||||
return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster().zdiffWithScores(sets)));
|
||||
} catch (Exception ex) {
|
||||
throw convertJedisAccessException(ex);
|
||||
}
|
||||
@@ -872,7 +876,7 @@ class JedisClusterZSetCommands implements RedisZSetCommands {
|
||||
if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) {
|
||||
|
||||
try {
|
||||
return connection.getCluster().zinter(new ZParams(), sets);
|
||||
return JedisConverters.toSet(connection.getCluster().zinter(new ZParams(), sets));
|
||||
} catch (Exception ex) {
|
||||
throw convertJedisAccessException(ex);
|
||||
}
|
||||
@@ -889,7 +893,8 @@ class JedisClusterZSetCommands implements RedisZSetCommands {
|
||||
if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) {
|
||||
|
||||
try {
|
||||
return JedisConverters.toTupleSet(connection.getCluster().zinterWithScores(new ZParams(), sets));
|
||||
return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster()
|
||||
.zinterWithScores(new ZParams(), sets)));
|
||||
} catch (Exception ex) {
|
||||
throw convertJedisAccessException(ex);
|
||||
}
|
||||
@@ -909,8 +914,8 @@ class JedisClusterZSetCommands implements RedisZSetCommands {
|
||||
if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) {
|
||||
|
||||
try {
|
||||
return JedisConverters
|
||||
.toTupleSet(connection.getCluster().zinterWithScores(toZParams(aggregate, weights), sets));
|
||||
return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster()
|
||||
.zinterWithScores(toZParams(aggregate, weights), sets)));
|
||||
} catch (Exception ex) {
|
||||
throw convertJedisAccessException(ex);
|
||||
}
|
||||
@@ -971,7 +976,7 @@ class JedisClusterZSetCommands implements RedisZSetCommands {
|
||||
if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) {
|
||||
|
||||
try {
|
||||
return connection.getCluster().zunion(new ZParams(), sets);
|
||||
return JedisConverters.toSet(connection.getCluster().zunion(new ZParams(), sets));
|
||||
} catch (Exception ex) {
|
||||
throw convertJedisAccessException(ex);
|
||||
}
|
||||
@@ -988,7 +993,8 @@ class JedisClusterZSetCommands implements RedisZSetCommands {
|
||||
if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) {
|
||||
|
||||
try {
|
||||
return JedisConverters.toTupleSet(connection.getCluster().zunionWithScores(new ZParams(), sets));
|
||||
return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster()
|
||||
.zunionWithScores(new ZParams(), sets)));
|
||||
} catch (Exception ex) {
|
||||
throw convertJedisAccessException(ex);
|
||||
}
|
||||
@@ -1008,10 +1014,11 @@ class JedisClusterZSetCommands implements RedisZSetCommands {
|
||||
if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) {
|
||||
|
||||
try {
|
||||
return JedisConverters
|
||||
.toTupleSet(connection.getCluster().zunionWithScores(toZParams(aggregate, weights), sets));
|
||||
return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster()
|
||||
.zunionWithScores(toZParams(aggregate, weights), sets)));
|
||||
} catch (Exception ex) {
|
||||
throw convertJedisAccessException(ex);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1126,21 +1133,14 @@ class JedisClusterZSetCommands implements RedisZSetCommands {
|
||||
return new ZParams().weights(weights.toArray()).aggregate(ZParams.Aggregate.valueOf(aggregate.name()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Workaround for broken Jedis BZPOP signature.
|
||||
*
|
||||
* @param bytes
|
||||
* @return
|
||||
*/
|
||||
@Nullable
|
||||
@SuppressWarnings("unchecked")
|
||||
private static Tuple toTuple(@Nullable List<?> bytes) {
|
||||
private static Tuple toTuple(@Nullable KeyValue<?, redis.clients.jedis.resps.Tuple> keyValue) {
|
||||
|
||||
if (bytes == null || bytes.isEmpty()) {
|
||||
return null;
|
||||
if (keyValue != null) {
|
||||
redis.clients.jedis.resps.Tuple tuple = keyValue.getValue();
|
||||
return tuple != null ? JedisConverters.toTuple(tuple) : null;
|
||||
}
|
||||
|
||||
return new DefaultTuple((byte[]) bytes.get(1), Double.parseDouble(new String((byte[]) bytes.get(2))));
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -59,9 +59,6 @@ import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import redis.clients.jedis.BuilderFactory;
|
||||
import redis.clients.jedis.CommandArguments;
|
||||
import redis.clients.jedis.CommandObject;
|
||||
@@ -130,6 +127,7 @@ public class JedisConnection extends AbstractRedisConnection {
|
||||
|
||||
private final Log LOGGER = LogFactory.getLog(getClass());
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
private List<JedisResult> pipelinedResults = new ArrayList<>();
|
||||
|
||||
private final @Nullable Pool<Jedis> pool;
|
||||
@@ -348,7 +346,6 @@ public class JedisConnection extends AbstractRedisConnection {
|
||||
jedis.close();
|
||||
}
|
||||
else {
|
||||
doExceptionThrowingOperationSafely(jedis::quit, "Failed to quit during close");
|
||||
doExceptionThrowingOperationSafely(jedis::disconnect, "Failed to disconnect during close");
|
||||
}
|
||||
}
|
||||
@@ -480,6 +477,7 @@ public class JedisConnection extends AbstractRedisConnection {
|
||||
public List<Object> exec() {
|
||||
|
||||
try {
|
||||
|
||||
if (transaction == null) {
|
||||
throw new InvalidDataAccessApiUsageException("No ongoing transaction; Did you forget to call multi");
|
||||
}
|
||||
@@ -489,6 +487,7 @@ public class JedisConnection extends AbstractRedisConnection {
|
||||
return !CollectionUtils.isEmpty(results)
|
||||
? new TransactionResultConverter<>(txResults, JedisExceptionConverter.INSTANCE).convert(results)
|
||||
: results;
|
||||
|
||||
} catch (Exception cause) {
|
||||
throw convertJedisAccessException(cause);
|
||||
} finally {
|
||||
|
||||
@@ -35,6 +35,7 @@ import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@@ -87,6 +88,8 @@ import org.springframework.util.Assert;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import redis.clients.jedis.util.SafeEncoder;
|
||||
|
||||
/**
|
||||
* Jedis type converters.
|
||||
*
|
||||
@@ -114,12 +117,22 @@ abstract class JedisConverters extends Converters {
|
||||
MINUS_BYTES = toBytes("-");
|
||||
POSITIVE_INFINITY_BYTES = toBytes("+inf");
|
||||
NEGATIVE_INFINITY_BYTES = toBytes("-inf");
|
||||
|
||||
}
|
||||
|
||||
@Nullable
|
||||
static <T> Set<T> toSet(@Nullable List<T> list) {
|
||||
return list != null ? new LinkedHashSet<>(list) : null;
|
||||
}
|
||||
|
||||
public static Converter<String, byte[]> stringToBytes() {
|
||||
return JedisConverters::toBytes;
|
||||
}
|
||||
|
||||
static ListConverter<String, byte[]> stringListToByteList() {
|
||||
return new ListConverter<>(stringToBytes());
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link ListConverter} converting jedis {@link redis.clients.jedis.resps.Tuple} to {@link Tuple}.
|
||||
*
|
||||
@@ -129,18 +142,18 @@ abstract class JedisConverters extends Converters {
|
||||
return new ListConverter<>(JedisConverters::toTuple);
|
||||
}
|
||||
|
||||
static ListConverter<String, byte[]> stringListToByteList() {
|
||||
return new ListConverter<>(stringToBytes());
|
||||
static Tuple toTuple(redis.clients.jedis.resps.Tuple source) {
|
||||
return new DefaultTuple(source.getBinaryElement(), source.getScore());
|
||||
}
|
||||
|
||||
static List<Tuple> toTupleList(List<redis.clients.jedis.resps.Tuple> source) {
|
||||
return tuplesToTuples().convert(source);
|
||||
}
|
||||
|
||||
static Set<Tuple> toTupleSet(Set<redis.clients.jedis.resps.Tuple> source) {
|
||||
return new SetConverter<>(JedisConverters::toTuple).convert(source);
|
||||
}
|
||||
|
||||
public static Tuple toTuple(redis.clients.jedis.resps.Tuple source) {
|
||||
return new DefaultTuple(source.getBinaryElement(), source.getScore());
|
||||
}
|
||||
|
||||
/**
|
||||
* Map a {@link Set} of {@link Tuple} by {@code value} to its {@code score}.
|
||||
*
|
||||
@@ -255,6 +268,7 @@ abstract class JedisConverters extends Converters {
|
||||
public static SortingParams toSortingParams(@Nullable SortParameters params) {
|
||||
|
||||
SortingParams jedisParams = null;
|
||||
|
||||
if (params != null) {
|
||||
jedisParams = new SortingParams();
|
||||
byte[] byPattern = params.getByPattern();
|
||||
@@ -278,6 +292,7 @@ abstract class JedisConverters extends Converters {
|
||||
jedisParams.alpha();
|
||||
}
|
||||
}
|
||||
|
||||
return jedisParams;
|
||||
}
|
||||
|
||||
@@ -386,8 +401,10 @@ abstract class JedisConverters extends Converters {
|
||||
* @since 2.6
|
||||
*/
|
||||
static GetExParams toGetExParams(Expiration expiration) {
|
||||
return toGetExParams(expiration, new GetExParams());
|
||||
}
|
||||
|
||||
GetExParams params = new GetExParams();
|
||||
static GetExParams toGetExParams(Expiration expiration, GetExParams params) {
|
||||
|
||||
if (expiration.isPersistent()) {
|
||||
return params.persist();
|
||||
@@ -584,18 +601,14 @@ abstract class JedisConverters extends Converters {
|
||||
return new ZAddParams();
|
||||
}
|
||||
|
||||
ZAddParams target = new ZAddParams() {
|
||||
|
||||
{
|
||||
if (source.contains(ZAddArgs.Flag.GT)) {
|
||||
addParam("gt");
|
||||
}
|
||||
if (source.contains(ZAddArgs.Flag.LT)) {
|
||||
addParam("lt");
|
||||
}
|
||||
}
|
||||
};
|
||||
ZAddParams target = new ZAddParams();
|
||||
|
||||
if (source.contains(ZAddArgs.Flag.GT)) {
|
||||
target.gt();
|
||||
}
|
||||
if (source.contains(ZAddArgs.Flag.LT)) {
|
||||
target.lt();
|
||||
}
|
||||
if (source.contains(ZAddArgs.Flag.XX)) {
|
||||
target.xx();
|
||||
}
|
||||
@@ -605,6 +618,7 @@ abstract class JedisConverters extends Converters {
|
||||
if (source.contains(ZAddArgs.Flag.CH)) {
|
||||
target.ch();
|
||||
}
|
||||
|
||||
return target;
|
||||
}
|
||||
|
||||
|
||||
@@ -37,8 +37,11 @@ import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* {@link RedisHashCommands} implementation for Jedis.
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @author Mark Paluch
|
||||
* @author John Blum
|
||||
* @since 2.0
|
||||
*/
|
||||
class JedisHashCommands implements RedisHashCommands {
|
||||
@@ -122,7 +125,7 @@ class JedisHashCommands implements RedisHashCommands {
|
||||
Assert.notNull(key, "Key must not be null");
|
||||
|
||||
return connection.invoke().from(Jedis::hrandfieldWithValues, PipelineBinaryCommands::hrandfieldWithValues, key, 1L)
|
||||
.get(it -> it.isEmpty() ? null : it.entrySet().iterator().next());
|
||||
.get(mapEntryList -> mapEntryList.isEmpty() ? null : mapEntryList.get(0));
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@@ -141,12 +144,16 @@ class JedisHashCommands implements RedisHashCommands {
|
||||
Assert.notNull(key, "Key must not be null");
|
||||
|
||||
return connection.invoke()
|
||||
.from(Jedis::hrandfieldWithValues, PipelineBinaryCommands::hrandfieldWithValues, key, count).get(it -> {
|
||||
.from(Jedis::hrandfieldWithValues, PipelineBinaryCommands::hrandfieldWithValues, key, count)
|
||||
.get(mapEntryList -> {
|
||||
|
||||
List<Entry<byte[], byte[]>> entries = new ArrayList<>(it.size());
|
||||
it.forEach((k, v) -> entries.add(Converters.entryOf(k, v)));
|
||||
List<Entry<byte[], byte[]>> convertedMapEntryList = new ArrayList<>(mapEntryList.size());
|
||||
|
||||
mapEntryList.forEach(entry ->
|
||||
convertedMapEntryList.add(Converters.entryOf(entry.getKey(), entry.getValue())));
|
||||
|
||||
return convertedMapEntryList;
|
||||
|
||||
return entries;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,6 @@ package org.springframework.data.redis.connection.jedis;
|
||||
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.Pipeline;
|
||||
import redis.clients.jedis.Queable;
|
||||
import redis.clients.jedis.Response;
|
||||
import redis.clients.jedis.Transaction;
|
||||
import redis.clients.jedis.commands.DatabasePipelineCommands;
|
||||
@@ -47,7 +46,7 @@ import org.springframework.util.Assert;
|
||||
* composing a functional pipeline to transform the result using a {@link Converter}.
|
||||
* <p>
|
||||
* Usage example:
|
||||
*
|
||||
* <p>
|
||||
* <pre class="code">
|
||||
* JedisInvoker invoker = …;
|
||||
*
|
||||
@@ -62,6 +61,7 @@ import org.springframework.util.Assert;
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @author Christoph Strobl
|
||||
* @author John Blum
|
||||
* @since 2.5
|
||||
*/
|
||||
class JedisInvoker {
|
||||
@@ -937,10 +937,7 @@ class JedisInvoker {
|
||||
|
||||
@Override
|
||||
public <T> T get(Converter<S, T> converter) {
|
||||
|
||||
Assert.notNull(converter, "Converter must not be null");
|
||||
|
||||
return synchronizer.invoke(parentFunction, parentPipelineFunction, converter, () -> null);
|
||||
return getOrElse(converter, () -> null);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@@ -959,6 +956,7 @@ class JedisInvoker {
|
||||
private final Function<ResponseCommands, Response<Collection<S>>> parentPipelineFunction;
|
||||
private final Synchronizer synchronizer;
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
DefaultManyInvocationSpec(Function<Jedis, ? extends Collection<S>> parentFunction,
|
||||
Function<ResponseCommands, Response<? extends Collection<S>>> parentPipelineFunction,
|
||||
Synchronizer synchronizer) {
|
||||
@@ -969,6 +967,7 @@ class JedisInvoker {
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("all")
|
||||
public <T> List<T> toList(Converter<S, T> converter) {
|
||||
|
||||
Assert.notNull(converter, "Converter must not be null");
|
||||
@@ -981,15 +980,17 @@ class JedisInvoker {
|
||||
|
||||
List<T> result = new ArrayList<>(source.size());
|
||||
|
||||
for (S s : source) {
|
||||
result.add(converter.convert(s));
|
||||
for (S element : source) {
|
||||
result.add(converter.convert(element));
|
||||
}
|
||||
|
||||
return result;
|
||||
|
||||
}, Collections::emptyList);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("all")
|
||||
public <T> Set<T> toSet(Converter<S, T> converter) {
|
||||
|
||||
Assert.notNull(converter, "Converter must not be null");
|
||||
@@ -1002,11 +1003,12 @@ class JedisInvoker {
|
||||
|
||||
Set<T> result = new LinkedHashSet<>(source.size());
|
||||
|
||||
for (S s : source) {
|
||||
result.add(converter.convert(s));
|
||||
for (S element : source) {
|
||||
result.add(converter.convert(element));
|
||||
}
|
||||
|
||||
return result;
|
||||
|
||||
}, Collections::emptySet);
|
||||
}
|
||||
}
|
||||
@@ -1020,6 +1022,7 @@ class JedisInvoker {
|
||||
@Nullable
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
default <I, T> T invoke(Function<Jedis, I> callFunction, Function<ResponseCommands, Response<I>> pipelineFunction) {
|
||||
|
||||
return (T) doInvoke((Function) callFunction, (Function) pipelineFunction, Converters.identityConverter(),
|
||||
() -> null);
|
||||
}
|
||||
@@ -1046,15 +1049,13 @@ class JedisInvoker {
|
||||
/**
|
||||
* Create a proxy to invoke methods dynamically on {@link Pipeline} or {@link Transaction} as those share many
|
||||
* commands that are not defined on a common super-type.
|
||||
*
|
||||
* @param pipelineOrTransaction
|
||||
* @return
|
||||
*/
|
||||
static ResponseCommands createCommands(Queable pipelineOrTransaction) {
|
||||
static ResponseCommands createCommands(Object pipelineOrTransaction) {
|
||||
|
||||
ProxyFactory proxyFactory = new ProxyFactory(pipelineOrTransaction);
|
||||
|
||||
proxyFactory.addInterface(ResponseCommands.class);
|
||||
|
||||
return (ResponseCommands) proxyFactory.getProxy(JedisInvoker.class.getClassLoader());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ import redis.clients.jedis.params.ScanParams;
|
||||
import redis.clients.jedis.params.ZParams;
|
||||
import redis.clients.jedis.params.ZRangeParams;
|
||||
import redis.clients.jedis.resps.ScanResult;
|
||||
import redis.clients.jedis.util.KeyValue;
|
||||
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
@@ -31,7 +32,6 @@ import java.util.concurrent.TimeUnit;
|
||||
import org.springframework.dao.InvalidDataAccessApiUsageException;
|
||||
import org.springframework.data.redis.connection.RedisZSetCommands;
|
||||
import org.springframework.data.redis.connection.zset.Aggregate;
|
||||
import org.springframework.data.redis.connection.zset.DefaultTuple;
|
||||
import org.springframework.data.redis.connection.zset.Tuple;
|
||||
import org.springframework.data.redis.connection.zset.Weights;
|
||||
import org.springframework.data.redis.core.Cursor;
|
||||
@@ -42,11 +42,14 @@ import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* {@link RedisZSetCommands} implementation for Jedis.
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @author Clement Ong
|
||||
* @author Mark Paluch
|
||||
* @author Andrey Shlykov
|
||||
* @author Shyngys Sapraliyev
|
||||
* @author John Blum
|
||||
* @since 2.0
|
||||
*/
|
||||
class JedisZSetCommands implements RedisZSetCommands {
|
||||
@@ -74,8 +77,10 @@ class JedisZSetCommands implements RedisZSetCommands {
|
||||
Assert.notNull(key, "Key must not be null");
|
||||
Assert.notNull(tuples, "Tuples must not be null");
|
||||
|
||||
return connection.invoke().just(Jedis::zadd, PipelineBinaryCommands::zadd, key, JedisConverters.toTupleMap(tuples),
|
||||
JedisConverters.toZAddParams(args));
|
||||
Long count = connection.invoke().just(Jedis::zadd, PipelineBinaryCommands::zadd, key,
|
||||
JedisConverters.toTupleMap(tuples), JedisConverters.toZAddParams(args));
|
||||
|
||||
return count != null ? count : 0L;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -424,7 +429,7 @@ class JedisZSetCommands implements RedisZSetCommands {
|
||||
|
||||
Assert.notNull(sets, "Sets must not be null");
|
||||
|
||||
return connection.invoke().just(Jedis::zdiff, PipelineBinaryCommands::zdiff, sets);
|
||||
return connection.invoke().fromMany(Jedis::zdiff, PipelineBinaryCommands::zdiff, sets).toSet();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -450,7 +455,7 @@ class JedisZSetCommands implements RedisZSetCommands {
|
||||
|
||||
Assert.notNull(sets, "Sets must not be null");
|
||||
|
||||
return connection.invoke().just(Jedis::zinter, PipelineBinaryCommands::zinter, new ZParams(), sets);
|
||||
return connection.invoke().fromMany(Jedis::zinter, PipelineBinaryCommands::zinter, new ZParams(), sets).toSet();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -504,7 +509,7 @@ class JedisZSetCommands implements RedisZSetCommands {
|
||||
|
||||
Assert.notNull(sets, "Sets must not be null");
|
||||
|
||||
return connection.invoke().just(Jedis::zunion, PipelineBinaryCommands::zunion, new ZParams(), sets);
|
||||
return connection.invoke().fromMany(Jedis::zunion, PipelineBinaryCommands::zunion, new ZParams(), sets).toSet();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -772,21 +777,8 @@ class JedisZSetCommands implements RedisZSetCommands {
|
||||
return zRangeParams;
|
||||
}
|
||||
|
||||
/**
|
||||
* Workaround for broken Jedis BZPOP signature.
|
||||
*
|
||||
* @param bytes
|
||||
* @return
|
||||
*/
|
||||
@Nullable
|
||||
@SuppressWarnings("unchecked")
|
||||
private static Tuple toTuple(List<?> bytes) {
|
||||
|
||||
if (bytes.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return new DefaultTuple((byte[]) bytes.get(1), Double.parseDouble(new String((byte[]) bytes.get(2))));
|
||||
private static Tuple toTuple(@Nullable KeyValue<?, redis.clients.jedis.resps.Tuple> keyValue) {
|
||||
return keyValue != null ? JedisConverters.toTuple(keyValue.getValue()) : null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -24,17 +24,20 @@ import org.springframework.lang.Nullable;
|
||||
*
|
||||
* @author Costin Leau
|
||||
* @author Christoph Strobl
|
||||
* @author John Blum
|
||||
*/
|
||||
public class DefaultTuple implements Tuple {
|
||||
|
||||
private static final Double ZERO = 0.0d;
|
||||
|
||||
private final Double score;
|
||||
private final byte[] value;
|
||||
|
||||
/**
|
||||
* Constructs a new <code>DefaultTuple</code> instance.
|
||||
* Constructs a new {@link DefaultTuple}.
|
||||
*
|
||||
* @param value
|
||||
* @param score
|
||||
* @param value {@link byte[]} of the member's raw value.
|
||||
* @param score {@link Double score} of the raw value used in sorting.
|
||||
*/
|
||||
public DefaultTuple(byte[] value, Double score) {
|
||||
|
||||
|
||||
Reference in New Issue
Block a user