Polish enhanced switch statements.

See #2705
Original pull request: #2706
This commit is contained in:
John Blum
2023-09-11 15:59:56 -07:00
parent 5e23156f78
commit ea4acad379
15 changed files with 644 additions and 559 deletions

View File

@@ -70,6 +70,7 @@ import org.springframework.util.Assert;
* @author Chen Guanqun
* @author Pavel Khokhlov
* @author Liming Deng
* @author John Blum
* @since 1.7
*/
public class JedisClusterConnection implements RedisClusterConnection {
@@ -168,13 +169,15 @@ public class JedisClusterConnection implements RedisClusterConnection {
Assert.notNull(command, "Command must not be null");
Assert.notNull(args, "Args must not be null");
return clusterCommandExecutor.executeCommandOnArbitraryNode(
(JedisClusterCommandCallback<Object>) client -> client.sendCommand(JedisClientUtils.getCommand(command), args))
.getValue();
JedisClusterCommandCallback<Object> commandCallback = jedis ->
jedis.sendCommand(JedisClientUtils.getCommand(command), args);
return this.clusterCommandExecutor.executeCommandOnArbitraryNode(commandCallback).getValue();
}
@Nullable
@Override
@SuppressWarnings("unchecked")
public <T> T execute(String command, byte[] key, Collection<byte[]> args) {
Assert.notNull(command, "Command must not be null");
@@ -183,11 +186,12 @@ public class JedisClusterConnection implements RedisClusterConnection {
byte[][] commandArgs = getCommandArguments(key, args);
RedisClusterNode keyMaster = topologyProvider.getTopology().getKeyServingMasterNode(key);
RedisClusterNode keyMaster = this.topologyProvider.getTopology().getKeyServingMasterNode(key);
return clusterCommandExecutor.executeCommandOnSingleNode((JedisClusterCommandCallback<T>) client -> {
return (T) client.sendCommand(JedisClientUtils.getCommand(command), commandArgs);
}, keyMaster).getValue();
JedisClusterCommandCallback<T> commandCallback = jedis ->
(T) jedis.sendCommand(JedisClientUtils.getCommand(command), commandArgs);
return this.clusterCommandExecutor.executeCommandOnSingleNode(commandCallback, keyMaster).getValue();
}
private static byte[][] getCommandArguments(byte[] key, Collection<byte[]> args) {
@@ -195,6 +199,7 @@ public class JedisClusterConnection implements RedisClusterConnection {
byte[][] commandArgs = new byte[args.size() + 1][];
commandArgs[0] = key;
int targetIndex = 1;
for (byte[] binaryArgument : args) {
@@ -226,15 +231,17 @@ public class JedisClusterConnection implements RedisClusterConnection {
* @since 2.1
*/
@Nullable
@SuppressWarnings("unchecked")
public <T> List<T> execute(String command, Collection<byte[]> keys, Collection<byte[]> args) {
Assert.notNull(command, "Command must not be null");
Assert.notNull(keys, "Key must not be null");
Assert.notNull(args, "Args must not be null");
return clusterCommandExecutor.executeMultiKeyCommand((JedisMultiKeyClusterCommandCallback<T>) (client, key) -> {
return (T) client.sendCommand(JedisClientUtils.getCommand(command), getCommandArguments(key, args));
}, keys).resultsAsList();
JedisMultiKeyClusterCommandCallback<T> commandCallback = (jedis, key) ->
(T) jedis.sendCommand(JedisClientUtils.getCommand(command), getCommandArguments(key, args));
return this.clusterCommandExecutor.executeMultiKeyCommand(commandCallback, keys).resultsAsList();
}
@@ -345,18 +352,19 @@ public class JedisClusterConnection implements RedisClusterConnection {
@Override
public boolean isSubscribed() {
return (subscription != null && subscription.isAlive());
return (this.subscription != null && this.subscription.isAlive());
}
@Override
public Subscription getSubscription() {
return subscription;
return this.subscription;
}
@Override
public Long publish(byte[] channel, byte[] message) {
try {
return cluster.publish(channel, message);
return this.cluster.publish(channel, message);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
@@ -366,15 +374,15 @@ public class JedisClusterConnection implements RedisClusterConnection {
public void subscribe(MessageListener listener, byte[]... channels) {
if (isSubscribed()) {
throw new RedisSubscribedConnectionException(
"Connection already subscribed; use the connection Subscription to cancel or add new channels");
String message = "Connection already subscribed; use the connection Subscription to cancel or add new channels";
throw new RedisSubscribedConnectionException(message);
}
try {
JedisMessageListener jedisPubSub = new JedisMessageListener(listener);
subscription = new JedisSubscription(listener, jedisPubSub, channels, null);
cluster.subscribe(jedisPubSub, channels);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
} catch (Exception cause) {
throw convertJedisAccessException(cause);
}
}
@@ -382,15 +390,16 @@ public class JedisClusterConnection implements RedisClusterConnection {
public void pSubscribe(MessageListener listener, byte[]... patterns) {
if (isSubscribed()) {
throw new RedisSubscribedConnectionException(
"Connection already subscribed; use the connection Subscription to cancel or add new channels");
String message = "Connection already subscribed; use the connection Subscription to cancel or add new channels";
throw new RedisSubscribedConnectionException(message);
}
try {
JedisMessageListener jedisPubSub = new JedisMessageListener(listener);
subscription = new JedisSubscription(listener, jedisPubSub, null, patterns);
cluster.psubscribe(jedisPubSub, patterns);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
} catch (Exception cause) {
throw convertJedisAccessException(cause);
}
}
@@ -407,19 +416,20 @@ public class JedisClusterConnection implements RedisClusterConnection {
throw new InvalidDataAccessApiUsageException("Echo not supported in cluster mode");
}
@Override
@Override @Nullable
public String ping() {
return !clusterCommandExecutor.executeCommandOnAllNodes((JedisClusterCommandCallback<String>) Jedis::ping)
.resultsAsList().isEmpty() ? "PONG" : null;
JedisClusterCommandCallback<String> command = Jedis::ping;
return !this.clusterCommandExecutor.executeCommandOnAllNodes(command).resultsAsList().isEmpty() ? "PONG" : null;
}
@Override
public String ping(RedisClusterNode node) {
return clusterCommandExecutor.executeCommandOnSingleNode((JedisClusterCommandCallback<String>) Jedis::ping, node)
.getValue();
JedisClusterCommandCallback<String> command = Jedis::ping;
return this.clusterCommandExecutor.executeCommandOnSingleNode(command, node).getValue();
}
/*
@@ -432,16 +442,17 @@ public class JedisClusterConnection implements RedisClusterConnection {
Assert.notNull(node, "Node must not be null");
Assert.notNull(mode, "AddSlots mode must not be null");
RedisClusterNode nodeToUse = topologyProvider.getTopology().lookup(node);
RedisClusterNode nodeToUse = this.topologyProvider.getTopology().lookup(node);
String nodeId = nodeToUse.getId();
clusterCommandExecutor.executeCommandOnSingleNode((JedisClusterCommandCallback<String>) client -> switch (mode) {
case IMPORTING -> client.clusterSetSlotImporting(slot, nodeId);
case MIGRATING -> client.clusterSetSlotMigrating(slot, nodeId);
case STABLE -> client.clusterSetSlotStable(slot);
case NODE -> client.clusterSetSlotNode(slot, nodeId);
}, node);
JedisClusterCommandCallback<String> command = jedis -> switch (mode) {
case IMPORTING -> jedis.clusterSetSlotImporting(slot, nodeId);
case MIGRATING -> jedis.clusterSetSlotMigrating(slot, nodeId);
case STABLE -> jedis.clusterSetSlotStable(slot);
case NODE -> jedis.clusterSetSlotNode(slot, nodeId);
};
this.clusterCommandExecutor.executeCommandOnSingleNode(command, node);
}
@Override
@@ -449,20 +460,24 @@ public class JedisClusterConnection implements RedisClusterConnection {
RedisClusterNode node = clusterGetNodeForSlot(slot);
NodeResult<List<byte[]>> result = clusterCommandExecutor
.executeCommandOnSingleNode(
(JedisClusterCommandCallback<List<byte[]>>) client -> JedisConverters.stringListToByteList()
.convert(client.clusterGetKeysInSlot(slot, count != null ? count.intValue() : Integer.MAX_VALUE)),
node);
JedisClusterCommandCallback<List<byte[]>> command = jedis ->
JedisConverters.stringListToByteList().convert(jedis.clusterGetKeysInSlot(slot, nullSafeIntValue(count)));
NodeResult<List<byte[]>> result = this.clusterCommandExecutor.executeCommandOnSingleNode(command, node);
return result.getValue();
}
private int nullSafeIntValue(@Nullable Integer value) {
return value != null ? value : Integer.MAX_VALUE;
}
@Override
public void clusterAddSlots(RedisClusterNode node, int... slots) {
clusterCommandExecutor.executeCommandOnSingleNode(
(JedisClusterCommandCallback<String>) client -> client.clusterAddSlots(slots), node);
JedisClusterCommandCallback<String> command = jedis -> jedis.clusterAddSlots(slots);
this.clusterCommandExecutor.executeCommandOnSingleNode(command, node);
}
@Override
@@ -478,16 +493,17 @@ public class JedisClusterConnection implements RedisClusterConnection {
RedisClusterNode node = clusterGetNodeForSlot(slot);
return clusterCommandExecutor.executeCommandOnSingleNode(
(JedisClusterCommandCallback<Long>) client -> client.clusterCountKeysInSlot(slot), node).getValue();
JedisClusterCommandCallback<Long> command = jedis -> jedis.clusterCountKeysInSlot(slot);
return this.clusterCommandExecutor.executeCommandOnSingleNode(command, node).getValue();
}
@Override
public void clusterDeleteSlots(RedisClusterNode node, int... slots) {
clusterCommandExecutor.executeCommandOnSingleNode(
(JedisClusterCommandCallback<String>) client -> client.clusterDelSlots(slots), node);
JedisClusterCommandCallback<String> command = jedis -> jedis.clusterDelSlots(slots);
this.clusterCommandExecutor.executeCommandOnSingleNode(command, node);
}
@Override
@@ -501,50 +517,54 @@ public class JedisClusterConnection implements RedisClusterConnection {
@Override
public void clusterForget(RedisClusterNode node) {
Set<RedisClusterNode> nodes = new LinkedHashSet<>(topologyProvider.getTopology().getActiveMasterNodes());
RedisClusterNode nodeToRemove = topologyProvider.getTopology().lookup(node);
Set<RedisClusterNode> nodes = new LinkedHashSet<>(this.topologyProvider.getTopology().getActiveMasterNodes());
RedisClusterNode nodeToRemove = this.topologyProvider.getTopology().lookup(node);
nodes.remove(nodeToRemove);
clusterCommandExecutor.executeCommandAsyncOnNodes(
(JedisClusterCommandCallback<String>) client -> client.clusterForget(node.getId()), nodes);
JedisClusterCommandCallback<String> command = jedis -> jedis.clusterForget(node.getId());
this.clusterCommandExecutor.executeCommandAsyncOnNodes(command, nodes);
}
@Override
@SuppressWarnings("all")
public void clusterMeet(RedisClusterNode node) {
Assert.notNull(node, "Cluster node must not be null for CLUSTER MEET command");
Assert.hasText(node.getHost(), "Node to meet cluster must have a host");
Assert.isTrue(node.getPort() > 0, "Node to meet cluster must have a port greater 0");
clusterCommandExecutor.executeCommandOnAllNodes(
(JedisClusterCommandCallback<String>) client -> client.clusterMeet(node.getHost(), node.getPort()));
JedisClusterCommandCallback<String> command = jedis -> jedis.clusterMeet(node.getHost(), node.getPort());
this.clusterCommandExecutor.executeCommandOnAllNodes(command);
}
@Override
public void clusterReplicate(RedisClusterNode master, RedisClusterNode replica) {
RedisClusterNode masterNode = topologyProvider.getTopology().lookup(master);
RedisClusterNode masterNode = this.topologyProvider.getTopology().lookup(master);
clusterCommandExecutor.executeCommandOnSingleNode(
(JedisClusterCommandCallback<String>) client -> client.clusterReplicate(masterNode.getId()), replica);
JedisClusterCommandCallback<String> command = jedis -> jedis.clusterReplicate(masterNode.getId());
this.clusterCommandExecutor.executeCommandOnSingleNode(command, replica);
}
@Override
public Integer clusterGetSlotForKey(byte[] key) {
return clusterCommandExecutor
.executeCommandOnArbitraryNode(
(JedisClusterCommandCallback<Integer>) client -> (int) client.clusterKeySlot(JedisConverters.toString(key)))
.getValue();
JedisClusterCommandCallback<Integer> command = jedis ->
Long.valueOf(jedis.clusterKeySlot(JedisConverters.toString(key))).intValue();
return this.clusterCommandExecutor.executeCommandOnArbitraryNode(command).getValue();
}
@Override
public RedisClusterNode clusterGetNodeForKey(byte[] key) {
return topologyProvider.getTopology().getKeyServingMasterNode(key);
return this.topologyProvider.getTopology().getKeyServingMasterNode(key);
}
@Override
@Override @Nullable
public RedisClusterNode clusterGetNodeForSlot(int slot) {
for (RedisClusterNode node : topologyProvider.getTopology().getSlotServingNodes(slot)) {
@@ -558,7 +578,7 @@ public class JedisClusterConnection implements RedisClusterConnection {
@Override
public Set<RedisClusterNode> clusterGetNodes() {
return topologyProvider.getTopology().getNodes();
return this.topologyProvider.getTopology().getNodes();
}
@Override
@@ -566,21 +586,26 @@ public class JedisClusterConnection implements RedisClusterConnection {
Assert.notNull(master, "Master cannot be null");
RedisClusterNode nodeToUse = topologyProvider.getTopology().lookup(master);
RedisClusterNode nodeToUse = this.topologyProvider.getTopology().lookup(master);
return JedisConverters.toSetOfRedisClusterNodes(clusterCommandExecutor
.executeCommandOnSingleNode(
(JedisClusterCommandCallback<List<String>>) client -> client.clusterSlaves(nodeToUse.getId()), master)
.getValue());
JedisClusterCommandCallback<List<String>> command = jedis -> jedis.clusterSlaves(nodeToUse.getId());
List<String> clusterNodes = this.clusterCommandExecutor.executeCommandOnSingleNode(command, master).getValue();
return JedisConverters.toSetOfRedisClusterNodes(clusterNodes);
}
@Override
public Map<RedisClusterNode, Collection<RedisClusterNode>> clusterGetMasterReplicaMap() {
List<NodeResult<Collection<RedisClusterNode>>> nodeResults = clusterCommandExecutor.executeCommandAsyncOnNodes(
(JedisClusterCommandCallback<Collection<RedisClusterNode>>) client -> JedisConverters
.toSetOfRedisClusterNodes(client.clusterSlaves(client.clusterMyId())),
topologyProvider.getTopology().getActiveMasterNodes()).getResults();
JedisClusterCommandCallback<Collection<RedisClusterNode>> command = jedis ->
JedisConverters.toSetOfRedisClusterNodes(jedis.clusterSlaves(jedis.clusterMyId()));
Set<RedisClusterNode> activeMasterNodes = this.topologyProvider.getTopology().getActiveMasterNodes();
List<NodeResult<Collection<RedisClusterNode>>> nodeResults =
this.clusterCommandExecutor.executeCommandAsyncOnNodes(command,activeMasterNodes)
.getResults();
Map<RedisClusterNode, Collection<RedisClusterNode>> result = new LinkedHashMap<>();
@@ -594,19 +619,22 @@ public class JedisClusterConnection implements RedisClusterConnection {
@Override
public ClusterInfo clusterGetClusterInfo() {
return new ClusterInfo(JedisConverters.toProperties(clusterCommandExecutor
.executeCommandOnArbitraryNode((JedisClusterCommandCallback<String>) Jedis::clusterInfo).getValue()));
JedisClusterCommandCallback<String> command = Jedis::clusterInfo;
String source = this.clusterCommandExecutor.executeCommandOnArbitraryNode(command).getValue();
return new ClusterInfo(JedisConverters.toProperties(source));
}
/*
* --> Little helpers to make it work
* Little helpers to make it work
*/
protected DataAccessException convertJedisAccessException(Exception ex) {
protected DataAccessException convertJedisAccessException(Exception cause) {
DataAccessException translated = EXCEPTION_TRANSLATION.translate(ex);
DataAccessException translated = EXCEPTION_TRANSLATION.translate(cause);
return translated != null ? translated : new RedisSystemException(ex.getMessage(), ex);
return translated != null ? translated : new RedisSystemException(cause.getMessage(), cause);
}
@Override
@@ -615,8 +643,8 @@ public class JedisClusterConnection implements RedisClusterConnection {
if (!closed && disposeClusterCommandExecutorOnClose) {
try {
clusterCommandExecutor.destroy();
} catch (Exception ex) {
log.warn("Cannot properly close cluster command executor", ex);
} catch (Exception cause) {
log.warn("Cannot properly close cluster command executor", cause);
}
}

View File

@@ -98,6 +98,7 @@ import org.springframework.util.StringUtils;
* @author Ninad Divadkar
* @author Guy Korland
* @author dengliming
* @author John Blum
*/
@SuppressWarnings("ConstantConditions")
abstract class JedisConverters extends Converters {
@@ -122,7 +123,6 @@ abstract class JedisConverters extends Converters {
/**
* {@link ListConverter} converting jedis {@link redis.clients.jedis.resps.Tuple} to {@link Tuple}.
*
* @return
* @since 1.4
*/
static ListConverter<redis.clients.jedis.resps.Tuple, Tuple> tuplesToTuples() {
@@ -145,7 +145,6 @@ abstract class JedisConverters extends Converters {
* Map a {@link Set} of {@link Tuple} by {@code value} to its {@code score}.
*
* @param tuples must not be {@literal null}.
* @return
* @since 2.0
*/
public static Map<byte[], Double> toTupleMap(Set<Tuple> tuples) {
@@ -170,8 +169,6 @@ abstract class JedisConverters extends Converters {
}
/**
* @param source
* @return
* @since 1.6
*/
public static byte[] toBytes(Double source) {
@@ -204,24 +201,23 @@ abstract class JedisConverters extends Converters {
}
/**
* @param source
* @return
* @since 1.7
*/
@SuppressWarnings("unchecked")
public static RedisClusterNode toNode(Object source) {
List<Object> values = (List<Object>) source;
RedisClusterNode.SlotRange range = new RedisClusterNode.SlotRange(((Number) values.get(0)).intValue(),
((Number) values.get(1)).intValue());
List<Object> nodeInfo = (List<Object>) values.get(2);
return new RedisClusterNode(toString((byte[]) nodeInfo.get(0)), ((Number) nodeInfo.get(1)).intValue(), range);
}
/**
* @param source
* @return
* @since 1.3
*/
public static List<RedisClientInfo> toListOfRedisClientInformation(String source) {
@@ -234,8 +230,6 @@ abstract class JedisConverters extends Converters {
}
/**
* @param source
* @return
* @since 1.4
*/
public static List<RedisServer> toListOfRedisServer(List<Map<String, String>> source) {
@@ -288,6 +282,7 @@ abstract class JedisConverters extends Converters {
}
public static BitOP toBitOp(BitOperation bitOp) {
return switch (bitOp) {
case AND -> BitOP.AND;
case OR -> BitOP.OR;
@@ -297,12 +292,9 @@ abstract class JedisConverters extends Converters {
}
/**
* Converts a given {@link Bound} to its binary representation suitable for {@literal ZRANGEBY*} commands, despite
* {@literal ZRANGEBYLEX}.
* Converts a given {@link org.springframework.data.domain.Range.Bound} to its binary representation suitable for
* {@literal ZRANGEBY*} commands, despite {@literal ZRANGEBYLEX}.
*
* @param boundary
* @param defaultValue
* @return
* @since 1.6
*/
public static byte[] boundaryToBytesForZRange(@Nullable org.springframework.data.domain.Range.Bound<?> boundary,
@@ -316,10 +308,9 @@ abstract class JedisConverters extends Converters {
}
/**
* Converts a given {@link Bound} to its binary representation suitable for ZRANGEBYLEX command.
* Converts a given {@link org.springframework.data.domain.Range.Bound} to its binary representation suitable for
* {@literal ZRANGEBYLEX} command.
*
* @param boundary
* @return
* @since 1.6
*/
public static byte[] boundaryToBytesForZRangeByLex(
@@ -342,7 +333,6 @@ abstract class JedisConverters extends Converters {
* </dl>
*
* @param expiration must not be {@literal null}.
* @return
* @since 2.2
*/
public static SetParams toSetCommandExPxArgument(Expiration expiration) {
@@ -359,8 +349,6 @@ abstract class JedisConverters extends Converters {
* </dl>
*
* @param expiration must not be {@literal null}.
* @param params
* @return
* @since 2.2
*/
public static SetParams toSetCommandExPxArgument(Expiration expiration, SetParams params) {
@@ -395,7 +383,6 @@ abstract class JedisConverters extends Converters {
* </dl>
*
* @param expiration must not be {@literal null}.
* @return
* @since 2.6
*/
static GetExParams toGetExParams(Expiration expiration) {
@@ -429,7 +416,6 @@ abstract class JedisConverters extends Converters {
* </dl>
*
* @param option must not be {@literal null}.
* @return
* @since 2.2
*/
public static SetParams toSetCommandNxXxArgument(SetOption option) {
@@ -448,8 +434,6 @@ abstract class JedisConverters extends Converters {
* </dl>
*
* @param option must not be {@literal null}.
* @param params
* @return
* @since 2.2
*/
public static SetParams toSetCommandNxXxArgument(SetOption option, SetParams params) {
@@ -492,9 +476,6 @@ abstract class JedisConverters extends Converters {
/**
* Convert {@link ScanOptions} to Jedis {@link ScanParams}.
*
* @param options
* @return
*/
public static ScanParams toScanParams(ScanOptions options) {
@@ -523,8 +504,6 @@ abstract class JedisConverters extends Converters {
}
/**
* @param source
* @return
* @since 1.8
*/
public static List<String> toStrings(List<byte[]> source) {
@@ -547,7 +526,6 @@ abstract class JedisConverters extends Converters {
}
/**
* @return
* @since 1.8
*/
public static ListConverter<redis.clients.jedis.GeoCoordinate, Point> geoCoordinateToPointConverter() {
@@ -555,7 +533,6 @@ abstract class JedisConverters extends Converters {
}
/**
* @return
* @since 2.5
*/
@Nullable
@@ -566,8 +543,6 @@ abstract class JedisConverters extends Converters {
/**
* Convert {@link Point} into {@link GeoCoordinate}.
*
* @param source
* @return
* @since 1.8
*/
public static GeoCoordinate toGeoCoordinate(Point source) {
@@ -577,8 +552,6 @@ abstract class JedisConverters extends Converters {
/**
* Get a {@link Converter} capable of converting {@link GeoRadiusResponse} into {@link GeoResults}.
*
* @param metric
* @return
* @since 1.8
*/
public static Converter<List<GeoRadiusResponse>, GeoResults<GeoLocation<byte[]>>> geoRadiusResponseToGeoResultsConverter(
@@ -589,8 +562,6 @@ abstract class JedisConverters extends Converters {
/**
* Convert {@link Metric} into {@link GeoUnit}.
*
* @param metric
* @return
* @since 1.8
*/
public static GeoUnit toGeoUnit(Metric metric) {
@@ -640,8 +611,6 @@ abstract class JedisConverters extends Converters {
/**
* Convert {@link GeoRadiusCommandArgs} into {@link GeoRadiusParam}.
*
* @param source
* @return
* @since 1.8
*/
public static GeoRadiusParam toGeoRadiusParam(GeoRadiusCommandArgs source) {
@@ -677,9 +646,6 @@ abstract class JedisConverters extends Converters {
/**
* Convert a timeout to seconds using {@code double} representation including fraction of seconds.
*
* @param timeout
* @param unit
* @return
* @since 2.6
*/
static double toSeconds(long timeout, TimeUnit unit) {
@@ -697,7 +663,6 @@ abstract class JedisConverters extends Converters {
/**
* Convert given {@link BitFieldSubCommands} into argument array.
*
* @param source
* @return never {@literal null}.
* @since 1.8
*/

View File

@@ -62,6 +62,7 @@ import org.springframework.util.ObjectUtils;
*
* @author Christoph Strobl
* @author Mark Paluch
* @author John Blum
* @since 1.7
*/
public class LettuceClusterConnection extends LettuceConnection
@@ -70,7 +71,14 @@ public class LettuceClusterConnection extends LettuceConnection
static final ExceptionTranslationStrategy exceptionConverter = new PassThroughExceptionTranslationStrategy(
LettuceExceptionConverter.INSTANCE);
private boolean disposeClusterCommandExecutorOnClose;
private ClusterCommandExecutor clusterCommandExecutor;
private ClusterTopologyProvider topologyProvider;
private final Log log = LogFactory.getLog(getClass());
private final LettuceClusterGeoCommands geoCommands = new LettuceClusterGeoCommands(this);
private final LettuceClusterHashCommands hashCommands = new LettuceClusterHashCommands(this);
private final LettuceClusterHyperLogLogCommands hllCommands = new LettuceClusterHyperLogLogCommands(this);
@@ -81,10 +89,6 @@ public class LettuceClusterConnection extends LettuceConnection
private final LettuceClusterZSetCommands zSetCommands = new LettuceClusterZSetCommands(this);
private final LettuceClusterServerCommands serverCommands = new LettuceClusterServerCommands(this);
private ClusterCommandExecutor clusterCommandExecutor;
private ClusterTopologyProvider topologyProvider;
private boolean disposeClusterCommandExecutorOnClose;
/**
* Creates new {@link LettuceClusterConnection} using {@link RedisClusterClient} with default
* {@link RedisURI#DEFAULT_TIMEOUT_DURATION timeout} and a fresh {@link ClusterCommandExecutor} that gets destroyed on
@@ -206,12 +210,14 @@ public class LettuceClusterConnection extends LettuceConnection
LettuceConnectionProvider connectionProvider = getConnectionProvider();
if (connectionProvider instanceof RedisClientProvider) {
return (RedisClusterClient) ((RedisClientProvider) getConnectionProvider()).getRedisClient();
if (connectionProvider instanceof RedisClientProvider redisClientProvider) {
return (RedisClusterClient) redisClientProvider.getRedisClient();
}
throw new IllegalStateException(String.format("Connection provider %s does not implement RedisClientProvider",
connectionProvider.getClass().getName()));
String message = String.format("Connection provider %s does not implement RedisClientProvider",
connectionProvider.getClass().getName());
throw new IllegalStateException(message);
}
@Override
@@ -266,8 +272,8 @@ public class LettuceClusterConnection extends LettuceConnection
@Override
public String ping() {
Collection<String> ping = clusterCommandExecutor
.executeCommandOnAllNodes((LettuceClusterCommandCallback<String>) BaseRedisCommands::ping).resultsAsList();
Collection<String> ping = this.clusterCommandExecutor.executeCommandOnAllNodes(pingCommand()).resultsAsList();
for (String result : ping) {
if (!ObjectUtils.nullSafeEquals("PONG", result)) {
@@ -280,14 +286,16 @@ public class LettuceClusterConnection extends LettuceConnection
@Override
public String ping(RedisClusterNode node) {
return this.clusterCommandExecutor.executeCommandOnSingleNode(pingCommand(), node).getValue();
}
return clusterCommandExecutor
.executeCommandOnSingleNode((LettuceClusterCommandCallback<String>) BaseRedisCommands::ping, node).getValue();
private LettuceClusterCommandCallback<String> pingCommand() {
return BaseRedisCommands::ping;
}
@Override
public List<RedisClusterNode> clusterGetNodes() {
return new ArrayList<>(topologyProvider.getTopology().getNodes());
return new ArrayList<>(this.topologyProvider.getTopology().getNodes());
}
@Override
@@ -295,21 +303,24 @@ public class LettuceClusterConnection extends LettuceConnection
Assert.notNull(master, "Master must not be null");
RedisClusterNode nodeToUse = topologyProvider.getTopology().lookup(master);
RedisClusterNode nodeToUse = this.topologyProvider.getTopology().lookup(master);
return clusterCommandExecutor
.executeCommandOnSingleNode((LettuceClusterCommandCallback<Set<RedisClusterNode>>) client -> LettuceConverters
.toSetOfRedisClusterNodes(client.clusterSlaves(nodeToUse.getId())), master)
.getValue();
LettuceClusterCommandCallback<Set<RedisClusterNode>> command = client ->
LettuceConverters.toSetOfRedisClusterNodes(client.clusterSlaves(nodeToUse.getId()));
return this.clusterCommandExecutor.executeCommandOnSingleNode(command, master).getValue();
}
@Override
public Map<RedisClusterNode, Collection<RedisClusterNode>> clusterGetMasterReplicaMap() {
List<NodeResult<Collection<RedisClusterNode>>> nodeResults = clusterCommandExecutor.executeCommandAsyncOnNodes(
(LettuceClusterCommandCallback<Collection<RedisClusterNode>>) client -> Converters
.toSetOfRedisClusterNodes(client.clusterSlaves(client.clusterMyId())),
topologyProvider.getTopology().getActiveMasterNodes()).getResults();
Set<RedisClusterNode> activeMasterNodes = this.topologyProvider.getTopology().getActiveMasterNodes();
LettuceClusterCommandCallback<Collection<RedisClusterNode>> command = client ->
Converters.toSetOfRedisClusterNodes(client.clusterSlaves(client.clusterMyId()));
List<NodeResult<Collection<RedisClusterNode>>> nodeResults =
this.clusterCommandExecutor.executeCommandAsyncOnNodes(command,activeMasterNodes).getResults();
Map<RedisClusterNode, Collection<RedisClusterNode>> result = new LinkedHashMap<>();
@@ -325,14 +336,13 @@ public class LettuceClusterConnection extends LettuceConnection
return SlotHash.getSlot(key);
}
@Nullable
@Override
public RedisClusterNode clusterGetNodeForSlot(int slot) {
Set<RedisClusterNode> nodes = topologyProvider.getTopology().getSlotServingNodes(slot);
if (nodes.isEmpty()) {
return null;
}
return nodes.iterator().next();
return !nodes.isEmpty() ? nodes.iterator().next() : null;
}
@Override
@@ -343,17 +353,18 @@ public class LettuceClusterConnection extends LettuceConnection
@Override
public ClusterInfo clusterGetClusterInfo() {
return clusterCommandExecutor
.executeCommandOnArbitraryNode((LettuceClusterCommandCallback<ClusterInfo>) client -> new ClusterInfo(
LettuceConverters.toProperties(client.clusterInfo())))
.getValue();
LettuceClusterCommandCallback<ClusterInfo> command = client ->
new ClusterInfo(LettuceConverters.toProperties(client.clusterInfo()));
return this.clusterCommandExecutor.executeCommandOnArbitraryNode(command).getValue();
}
@Override
public void clusterAddSlots(RedisClusterNode node, int... slots) {
clusterCommandExecutor.executeCommandOnSingleNode(
(LettuceClusterCommandCallback<String>) client -> client.clusterAddSlots(slots), node);
LettuceClusterCommandCallback<String> command = client -> client.clusterAddSlots(slots);
this.clusterCommandExecutor.executeCommandOnSingleNode(command, node);
}
@Override
@@ -369,15 +380,17 @@ public class LettuceClusterConnection extends LettuceConnection
try {
return getConnection().clusterCountKeysInSlot(slot);
} catch (Exception ex) {
throw exceptionConverter.translate(ex);
} catch (Exception cause) {
throw this.exceptionConverter.translate(cause);
}
}
@Override
public void clusterDeleteSlots(RedisClusterNode node, int... slots) {
clusterCommandExecutor.executeCommandOnSingleNode(
(LettuceClusterCommandCallback<String>) client -> client.clusterDelSlots(slots), node);
LettuceClusterCommandCallback<String> command = client -> client.clusterDelSlots(slots);
this.clusterCommandExecutor.executeCommandOnSingleNode(command, node);
}
@Override
@@ -393,21 +406,25 @@ public class LettuceClusterConnection extends LettuceConnection
List<RedisClusterNode> nodes = new ArrayList<>(clusterGetNodes());
RedisClusterNode nodeToRemove = topologyProvider.getTopology().lookup(node);
nodes.remove(nodeToRemove);
this.clusterCommandExecutor.executeCommandAsyncOnNodes(
(LettuceClusterCommandCallback<String>) client -> client.clusterForget(nodeToRemove.getId()), nodes);
LettuceClusterCommandCallback<String> command = client -> client.clusterForget(nodeToRemove.getId());
this.clusterCommandExecutor.executeCommandAsyncOnNodes(command, nodes);
}
@Override
@SuppressWarnings("all")
public void clusterMeet(RedisClusterNode node) {
Assert.notNull(node, "Cluster node must not be null for CLUSTER MEET command");
Assert.hasText(node.getHost(), "Node to meet cluster must have a host");
Assert.isTrue(node.getPort() > 0, "Node to meet cluster must have a port greater 0");
this.clusterCommandExecutor.executeCommandOnAllNodes(
(LettuceClusterCommandCallback<String>) client -> client.clusterMeet(node.getHost(), node.getPort()));
LettuceClusterCommandCallback<String> command = client -> client.clusterMeet(node.getHost(), node.getPort());
this.clusterCommandExecutor.executeCommandOnAllNodes(command);
}
@Override
@@ -419,12 +436,14 @@ public class LettuceClusterConnection extends LettuceConnection
RedisClusterNode nodeToUse = topologyProvider.getTopology().lookup(node);
String nodeId = nodeToUse.getId();
clusterCommandExecutor.executeCommandOnSingleNode((LettuceClusterCommandCallback<String>) client -> switch (mode) {
LettuceClusterCommandCallback<String> command = client -> switch (mode) {
case MIGRATING -> client.clusterSetSlotMigrating(slot, nodeId);
case IMPORTING -> client.clusterSetSlotImporting(slot, nodeId);
case NODE -> client.clusterSetSlotNode(slot, nodeId);
case STABLE -> client.clusterSetSlotStable(slot);
}, node);
};
this.clusterCommandExecutor.executeCommandOnSingleNode(command, node);
}
@Override
@@ -432,17 +451,19 @@ public class LettuceClusterConnection extends LettuceConnection
try {
return getConnection().clusterGetKeysInSlot(slot, count);
} catch (Exception ex) {
throw exceptionConverter.translate(ex);
} catch (Exception cause) {
throw this.exceptionConverter.translate(cause);
}
}
@Override
public void clusterReplicate(RedisClusterNode master, RedisClusterNode replica) {
RedisClusterNode masterNode = topologyProvider.getTopology().lookup(master);
clusterCommandExecutor.executeCommandOnSingleNode(
(LettuceClusterCommandCallback<String>) client -> client.clusterReplicate(masterNode.getId()), replica);
RedisClusterNode masterNode = this.topologyProvider.getTopology().lookup(master);
LettuceClusterCommandCallback<String> command = client -> client.clusterReplicate(masterNode.getId());
this.clusterCommandExecutor.executeCommandOnSingleNode(command, replica);
}
@Override
@@ -485,7 +506,7 @@ public class LettuceClusterConnection extends LettuceConnection
}
public ClusterCommandExecutor getClusterCommandExecutor() {
return clusterCommandExecutor;
return this.clusterCommandExecutor;
}
@Override
@@ -543,25 +564,24 @@ public class LettuceClusterConnection extends LettuceConnection
Assert.notNull(node, "Node must not be null");
if (connection == null) {
if (this.connection == null) {
synchronized (this) {
if (connection == null) {
this.connection = connectionProvider.getConnection(StatefulRedisClusterConnection.class);
if (this.connection == null) {
this.connection = this.connectionProvider.getConnection(StatefulRedisClusterConnection.class);
}
}
}
return connection.getConnection(node.getHost(), node.getPort()).sync();
return this.connection.getConnection(node.getHost(), node.getPort()).sync();
}
@Override
@SuppressWarnings("unchecked")
public void returnResourceForSpecificNode(RedisClusterNode node, Object resource) {}
@Override
public void destroy() throws Exception {
if (connection != null) {
connectionProvider.release(connection);
if (this.connection != null) {
this.connectionProvider.release(this.connection);
}
}
}

View File

@@ -37,7 +37,6 @@ import org.springframework.data.geo.Point;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.BitFieldSubCommands.BitFieldGet;
import org.springframework.data.redis.connection.BitFieldSubCommands.BitFieldIncrBy;
import org.springframework.data.redis.connection.BitFieldSubCommands.BitFieldIncrBy.Overflow;
import org.springframework.data.redis.connection.BitFieldSubCommands.BitFieldSet;
import org.springframework.data.redis.connection.BitFieldSubCommands.BitFieldSubCommand;
import org.springframework.data.redis.connection.Limit;
@@ -79,6 +78,7 @@ import org.springframework.util.StringUtils;
* @author dengliming
* @author Chris Bono
* @author Vikas Garg
* @author John Blum
*/
@SuppressWarnings("ConstantConditions")
public abstract class LettuceConverters extends Converters {
@@ -92,7 +92,6 @@ public abstract class LettuceConverters extends Converters {
private static final long INDEXED_RANGE_END = -1;
static {
PLUS_BYTES = toBytes("+");
MINUS_BYTES = toBytes("-");
POSITIVE_INFINITY_BYTES = toBytes("+inf");
@@ -100,7 +99,9 @@ public abstract class LettuceConverters extends Converters {
}
public static Point geoCoordinatesToPoint(@Nullable GeoCoordinates geoCoordinate) {
return geoCoordinate != null ? new Point(geoCoordinate.getX().doubleValue(), geoCoordinate.getY().doubleValue())
return geoCoordinate != null
? new Point(geoCoordinate.getX().doubleValue(), geoCoordinate.getY().doubleValue())
: null;
}
@@ -109,15 +110,19 @@ public abstract class LettuceConverters extends Converters {
}
public static Converter<List<ScoredValue<byte[]>>, List<Tuple>> scoredValuesToTupleList() {
return source -> {
if (source == null) {
return null;
}
List<Tuple> tuples = new ArrayList<>(source.size());
for (ScoredValue<byte[]> value : source) {
tuples.add(LettuceConverters.toTuple(value));
}
return tuples;
};
}
@@ -127,8 +132,7 @@ public abstract class LettuceConverters extends Converters {
}
/**
* @return
* @sice 1.3
* @since 1.3
*/
public static Converter<Long, Boolean> longToBooleanConverter() {
return Converters::toBoolean;
@@ -147,7 +151,9 @@ public abstract class LettuceConverters extends Converters {
if (source == null) {
return null;
}
List<byte[]> list = new ArrayList<>(2);
list.add(source.getKey());
list.add(source.getValue());
@@ -155,56 +161,63 @@ public abstract class LettuceConverters extends Converters {
}
public static List<byte[]> toBytesList(Collection<byte[]> source) {
if (source instanceof List) {
return (List<byte[]>) source;
}
return source != null ? new ArrayList<>(source) : null;
}
public static Tuple toTuple(@Nullable ScoredValue<byte[]> source) {
return source != null && source.hasValue() ? new DefaultTuple(source.getValue(), Double.valueOf(source.getScore()))
return source != null && source.hasValue()
? new DefaultTuple(source.getValue(), Double.valueOf(source.getScore()))
: null;
}
public static String toString(@Nullable byte[] source) {
if (source == null || Arrays.equals(source, new byte[0])) {
return null;
}
return new String(source);
}
public static ScriptOutputType toScriptOutputType(ReturnType returnType) {
Assert.notNull(returnType, () -> "Return type " + returnType + " is not a supported script output type");
return switch (returnType) {
case BOOLEAN -> ScriptOutputType.BOOLEAN;
case MULTI -> ScriptOutputType.MULTI;
case VALUE -> ScriptOutputType.VALUE;
case INTEGER -> ScriptOutputType.INTEGER;
case STATUS -> ScriptOutputType.STATUS;
default ->
throw new IllegalArgumentException("Return type " + returnType + " is not a supported script output type");
};
}
public static boolean toBoolean(Position where) {
Assert.notNull(where, "list positions are mandatory");
return (Position.AFTER.equals(where) ? false : true);
return !Position.AFTER.equals(where);
}
public static int toInt(boolean value) {
return (value ? 1 : 0);
return value ? 1 : 0;
}
public static Map<byte[], byte[]> toMap(List<byte[]> source) {
if (CollectionUtils.isEmpty(source)) {
return Collections.emptyMap();
}
Map<byte[], byte[]> target = new LinkedHashMap<>();
Iterator<byte[]> keyValue = source.iterator();
Iterator<byte[]> kv = source.iterator();
while (kv.hasNext()) {
target.put(kv.next(), kv.hasNext() ? kv.next() : null);
while (keyValue.hasNext()) {
target.put(keyValue.next(), keyValue.hasNext() ? keyValue.next() : null);
}
return target;
@@ -213,6 +226,7 @@ public abstract class LettuceConverters extends Converters {
public static SortArgs toSortArgs(SortParameters params) {
SortArgs args = new SortArgs();
if (params == null) {
return args;
}
@@ -235,10 +249,13 @@ public abstract class LettuceConverters extends Converters {
args.desc();
}
}
Boolean isAlpha = params.isAlphabetic();
if (isAlpha != null && isAlpha) {
args.alpha();
}
return args;
}
@@ -254,7 +271,6 @@ public abstract class LettuceConverters extends Converters {
/**
* Convert a {@link Limit} to a Lettuce {@link io.lettuce.core.Limit}.
*
* @param limit
* @return a lettuce {@link io.lettuce.core.Limit}.
* @since 2.0
*/
@@ -266,8 +282,6 @@ public abstract class LettuceConverters extends Converters {
/**
* Convert a {@link org.springframework.data.redis.connection.RedisZSetCommands.Range} to a lettuce {@link Range}.
*
* @param range
* @return
* @since 2.0
*/
public static <T> Range<T> toRange(org.springframework.data.domain.Range<T> range) {
@@ -277,9 +291,6 @@ public abstract class LettuceConverters extends Converters {
/**
* Convert a {@link org.springframework.data.domain.Range} to a lettuce {@link Range}.
*
* @param range
* @param convertNumberToBytes
* @return
* @since 2.2
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -287,6 +298,7 @@ public abstract class LettuceConverters extends Converters {
Range.Boundary upper = RangeConverter.convertBound(range.getUpperBound(), convertNumberToBytes, null,
it -> it.getBytes(StandardCharsets.UTF_8));
Range.Boundary lower = RangeConverter.convertBound(range.getLowerBound(), convertNumberToBytes, null,
it -> it.getBytes(StandardCharsets.UTF_8));
@@ -296,8 +308,6 @@ public abstract class LettuceConverters extends Converters {
/**
* Convert a {@link org.springframework.data.domain.Range} to a lettuce {@link Range} and reverse boundaries.
*
* @param range
* @return
* @since 2.0
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -305,6 +315,7 @@ public abstract class LettuceConverters extends Converters {
Range.Boundary upper = RangeConverter.convertBound(range.getUpperBound(), false, null,
it -> it.getBytes(StandardCharsets.UTF_8));
Range.Boundary lower = RangeConverter.convertBound(range.getLowerBound(), false, null,
it -> it.getBytes(StandardCharsets.UTF_8));
@@ -324,9 +335,11 @@ public abstract class LettuceConverters extends Converters {
}
List<RedisServer> sentinels = new ArrayList<>();
for (Map<String, String> info : source) {
sentinels.add(RedisServer.newServerFrom(Converters.toProperties(info)));
}
return sentinels;
}
@@ -343,6 +356,7 @@ public abstract class LettuceConverters extends Converters {
Set<RedisNode> sentinels = sentinelConfiguration.getSentinels();
RedisPassword sentinelPassword = sentinelConfiguration.getSentinelPassword();
RedisURI.Builder builder = RedisURI.builder();
for (RedisNode sentinel : sentinels) {
RedisURI.Builder sentinelBuilder = RedisURI.Builder.redis(sentinel.getHost(), sentinel.getPort());
@@ -383,6 +397,7 @@ public abstract class LettuceConverters extends Converters {
static RedisStandaloneConfiguration createRedisStandaloneConfiguration(RedisURI redisURI) {
RedisStandaloneConfiguration standaloneConfiguration = new RedisStandaloneConfiguration();
standaloneConfiguration.setHostName(redisURI.getHost());
standaloneConfiguration.setPort(redisURI.getPort());
standaloneConfiguration.setDatabase(redisURI.getDatabase());
@@ -402,6 +417,7 @@ public abstract class LettuceConverters extends Converters {
static RedisSocketConfiguration createRedisSocketConfiguration(RedisURI redisURI) {
RedisSocketConfiguration socketConfiguration = new RedisSocketConfiguration();
socketConfiguration.setSocket(redisURI.getSocket());
socketConfiguration.setDatabase(redisURI.getDatabase());
@@ -420,16 +436,21 @@ public abstract class LettuceConverters extends Converters {
static RedisSentinelConfiguration createRedisSentinelConfiguration(RedisURI redisURI) {
RedisSentinelConfiguration sentinelConfiguration = new RedisSentinelConfiguration();
if (!ObjectUtils.isEmpty(redisURI.getSentinelMasterId())) {
sentinelConfiguration.setMaster(redisURI.getSentinelMasterId());
}
sentinelConfiguration.setDatabase(redisURI.getDatabase());
for (RedisURI sentinelNodeRedisUri : redisURI.getSentinels()) {
RedisNode sentinelNode = new RedisNode(sentinelNodeRedisUri.getHost(), sentinelNodeRedisUri.getPort());
if (sentinelNodeRedisUri.getPassword() != null) {
sentinelConfiguration.setSentinelPassword(sentinelNodeRedisUri.getPassword());
}
sentinelConfiguration.addSentinel(sentinelNode);
}
@@ -450,10 +471,7 @@ public abstract class LettuceConverters extends Converters {
}
public static byte[] toBytes(@Nullable String source) {
if (source == null) {
return null;
}
return source.getBytes();
return source != null ? source.getBytes() : null;
}
public static byte[] toBytes(Integer source) {
@@ -465,8 +483,6 @@ public abstract class LettuceConverters extends Converters {
}
/**
* @param source
* @return
* @since 1.6
*/
public static byte[] toBytes(Double source) {
@@ -489,8 +505,6 @@ public abstract class LettuceConverters extends Converters {
}
/**
* @param source
* @return
* @since 1.7
*/
public static RedisClusterNode toRedisClusterNode(io.lettuce.core.cluster.models.partitions.RedisClusterNode source) {
@@ -507,18 +521,22 @@ public abstract class LettuceConverters extends Converters {
private static Set<Flag> parseFlags(@Nullable Set<NodeFlag> source) {
Set<Flag> flags = new LinkedHashSet<>(source != null ? source.size() : 8, 1);
for (NodeFlag flag : source) {
switch (flag) {
case NOFLAGS -> flags.add(Flag.NOFLAGS);
case EVENTUAL_FAIL -> flags.add(Flag.PFAIL);
case FAIL -> flags.add(Flag.FAIL);
case HANDSHAKE -> flags.add(Flag.HANDSHAKE);
case MASTER -> flags.add(Flag.MASTER);
case MYSELF -> flags.add(Flag.MYSELF);
case NOADDR -> flags.add(Flag.NOADDR);
case SLAVE, REPLICA -> flags.add(Flag.REPLICA);
if (Objects.nonNull(flag)) {
switch (flag) {
case NOFLAGS -> flags.add(Flag.NOFLAGS);
case EVENTUAL_FAIL -> flags.add(Flag.PFAIL);
case FAIL -> flags.add(Flag.FAIL);
case HANDSHAKE -> flags.add(Flag.HANDSHAKE);
case MASTER -> flags.add(Flag.MASTER);
case MYSELF -> flags.add(Flag.MYSELF);
case NOADDR -> flags.add(Flag.NOADDR);
case SLAVE, REPLICA -> flags.add(Flag.REPLICA);
}
}
}
return flags;
}
@@ -559,12 +577,12 @@ public abstract class LettuceConverters extends Converters {
}
if (option != null) {
switch (option) {
case SET_IF_ABSENT -> args.nx();
case SET_IF_PRESENT -> args.xx();
}
}
return args;
}
@@ -572,7 +590,6 @@ public abstract class LettuceConverters extends Converters {
* Convert {@link Expiration} to {@link GetExArgs}.
*
* @param expiration can be {@literal null}.
* @return
* @since 2.6
*/
static GetExArgs toGetExArgs(@Nullable Expiration expiration) {
@@ -613,22 +630,19 @@ public abstract class LettuceConverters extends Converters {
/**
* Convert {@link Metric} into {@link GeoArgs.Unit}.
*
* @param metric
* @return
* @since 1.8
*/
public static GeoArgs.Unit toGeoArgsUnit(Metric metric) {
Metric metricToUse = metric == null || ObjectUtils.nullSafeEquals(Metrics.NEUTRAL, metric) ? DistanceUnit.METERS
: metric;
Metric metricToUse = metric == null
|| ObjectUtils.nullSafeEquals(Metrics.NEUTRAL, metric) ? DistanceUnit.METERS : metric;
return ObjectUtils.caseInsensitiveValueOf(GeoArgs.Unit.values(), metricToUse.getAbbreviation());
}
/**
* Convert {@link GeoRadiusCommandArgs} into {@link GeoArgs}.
*
* @param args
* @return
* @since 1.8
*/
public static GeoArgs toGeoArgs(GeoRadiusCommandArgs args) {
@@ -638,8 +652,6 @@ public abstract class LettuceConverters extends Converters {
/**
* Convert {@link GeoCommandArgs} into {@link GeoArgs}.
*
* @param args
* @return
* @since 2.6
*/
public static GeoArgs toGeoArgs(GeoCommandArgs args) {
@@ -673,8 +685,6 @@ public abstract class LettuceConverters extends Converters {
/**
* Convert {@link BitFieldSubCommands} into {@link BitFieldArgs}.
*
* @param subCommands
* @return
* @since 2.1
*/
public static BitFieldArgs toBitFieldArgs(BitFieldSubCommands subCommands) {
@@ -683,36 +693,34 @@ public abstract class LettuceConverters extends Converters {
for (BitFieldSubCommand subCommand : subCommands) {
BitFieldArgs.BitFieldType bft = subCommand.getType().isSigned()
BitFieldArgs.BitFieldType bitFieldType = subCommand.getType().isSigned()
? BitFieldArgs.signed(subCommand.getType().getBits())
: BitFieldArgs.unsigned(subCommand.getType().getBits());
BitFieldArgs.Offset offset;
if (subCommand.getOffset().isZeroBased()) {
offset = BitFieldArgs.offset((int) subCommand.getOffset().getValue());
} else {
offset = BitFieldArgs.typeWidthBasedOffset((int) subCommand.getOffset().getValue());
}
BitFieldArgs.Offset offset = subCommand.getOffset().isZeroBased()
? BitFieldArgs.offset((int) subCommand.getOffset().getValue())
: BitFieldArgs.typeWidthBasedOffset((int) subCommand.getOffset().getValue());
if (subCommand instanceof BitFieldGet) {
args = args.get(bft, offset);
args = args.get(bitFieldType, offset);
} else if (subCommand instanceof BitFieldSet) {
args = args.set(bft, offset, ((BitFieldSet) subCommand).getValue());
args = args.set(bitFieldType, offset, ((BitFieldSet) subCommand).getValue());
} else if (subCommand instanceof BitFieldIncrBy) {
BitFieldIncrBy.Overflow overflow = ((BitFieldIncrBy) subCommand).getOverflow();
if (overflow != null) {
BitFieldArgs.OverflowType type = switch (overflow) {
case SAT -> BitFieldArgs.OverflowType.SAT;
case FAIL -> BitFieldArgs.OverflowType.FAIL;
case WRAP -> BitFieldArgs.OverflowType.WRAP;
};
};
args = args.overflow(type);
}
args = args.incrBy(bft, (int) subCommand.getOffset().getValue(), ((BitFieldIncrBy) subCommand).getValue());
args = args.incrBy(bitFieldType, (int) subCommand.getOffset().getValue(), ((BitFieldIncrBy) subCommand).getValue());
}
}
@@ -736,6 +744,7 @@ public abstract class LettuceConverters extends Converters {
KeyScanArgs scanArgs = new KeyScanArgs();
byte[] pattern = options.getBytePattern();
if (pattern != null) {
scanArgs.match(pattern);
}
@@ -754,7 +763,6 @@ public abstract class LettuceConverters extends Converters {
/**
* Get {@link Converter} capable of {@link Set} of {@link Byte} into {@link GeoResults}.
*
* @return
* @since 1.8
*/
public static Converter<Set<byte[]>, GeoResults<GeoLocation<byte[]>>> bytesSetToGeoResultsConverter() {
@@ -767,9 +775,11 @@ public abstract class LettuceConverters extends Converters {
List<GeoResult<GeoLocation<byte[]>>> results = new ArrayList<>(source.size());
Iterator<byte[]> it = source.iterator();
while (it.hasNext()) {
results.add(new GeoResult<>(new GeoLocation<>(it.next(), null), new Distance(0D)));
}
return new GeoResults<>(results);
};
}
@@ -777,8 +787,6 @@ public abstract class LettuceConverters extends Converters {
/**
* Get {@link Converter} capable of convering {@link GeoWithin} into {@link GeoResults}.
*
* @param metric
* @return
* @since 1.8
*/
public static Converter<List<GeoWithin<byte[]>>, GeoResults<GeoLocation<byte[]>>> geoRadiusResponseToGeoResultsConverter(
@@ -793,9 +801,6 @@ public abstract class LettuceConverters extends Converters {
/**
* Return {@link Optional} lower bound from {@link Range}.
*
* @param range
* @param <T>
* @return
* @since 2.0.9
*/
static <T extends Comparable<T>> Optional<T> getLowerBound(org.springframework.data.domain.Range<T> range) {
@@ -805,9 +810,6 @@ public abstract class LettuceConverters extends Converters {
/**
* Return {@link Optional} upper bound from {@link Range}.
*
* @param range
* @param <T>
* @return
* @since 2.0.9
*/
static <T extends Comparable<T>> Optional<T> getUpperBound(org.springframework.data.domain.Range<T> range) {
@@ -818,7 +820,6 @@ public abstract class LettuceConverters extends Converters {
* Return the lower bound index from {@link Range} or {@literal 0} (zero) if the lower range is not bounded to point
* to the first element. To be used with index-based commands such as {@code LRANGE}, {@code GETRANGE}.
*
* @param range
* @return the lower index bound value or {@literal 0} for the first element if not bounded.
* @since 2.0.9
*/
@@ -830,7 +831,6 @@ public abstract class LettuceConverters extends Converters {
* Return the upper bound index from {@link Range} or {@literal -1} (minus one) if the upper range is not bounded to
* point to the last element. To be used with index-based commands such as {@code LRANGE}, {@code GETRANGE}.
*
* @param range
* @return the upper index bound value or {@literal -1} for the last element if not bounded.
* @since 2.0.9
*/
@@ -866,6 +866,7 @@ public abstract class LettuceConverters extends Converters {
BoxShape boxPredicate = (BoxShape) predicate;
BoundingBox boundingBox = boxPredicate.getBoundingBox();
return GeoSearch.byBox(boundingBox.getWidth().getValue(), boundingBox.getHeight().getValue(),
toGeoArgsUnit(boxPredicate.getMetric()));
}
@@ -882,6 +883,7 @@ public abstract class LettuceConverters extends Converters {
if (reference instanceof GeoReference.GeoCoordinateReference) {
GeoCoordinateReference<?> coordinates = (GeoCoordinateReference<?>) reference;
return GeoSearch.fromCoordinates(coordinates.getLongitude(), coordinates.getLatitude());
}
@@ -897,26 +899,26 @@ public abstract class LettuceConverters extends Converters {
return switch (option) {
case ASYNC -> FlushMode.ASYNC;
case SYNC -> FlushMode.SYNC;
};
};
}
/**
* @author Christoph Strobl
* @since 1.8
*/
static enum GeoResultsConverterFactory {
enum GeoResultsConverterFactory {
INSTANCE;
Converter<List<GeoWithin<byte[]>>, GeoResults<GeoLocation<byte[]>>> forMetric(Metric metric) {
return new GeoResultsConverter(
metric == null || ObjectUtils.nullSafeEquals(Metrics.NEUTRAL, metric) ? DistanceUnit.METERS : metric);
return new GeoResultsConverter(metric == null
|| ObjectUtils.nullSafeEquals(Metrics.NEUTRAL, metric) ? DistanceUnit.METERS : metric);
}
private static class GeoResultsConverter
implements Converter<List<GeoWithin<byte[]>>, GeoResults<GeoLocation<byte[]>>> {
private Metric metric;
private final Metric metric;
public GeoResultsConverter(Metric metric) {
this.metric = metric;
@@ -942,7 +944,7 @@ public abstract class LettuceConverters extends Converters {
* @author Christoph Strobl
* @since 1.8
*/
static enum GeoResultConverterFactory {
enum GeoResultConverterFactory {
INSTANCE;
@@ -952,7 +954,7 @@ public abstract class LettuceConverters extends Converters {
private static class GeoResultConverter implements Converter<GeoWithin<byte[]>, GeoResult<GeoLocation<byte[]>>> {
private Metric metric;
private final Metric metric;
public GeoResultConverter(Metric metric) {
this.metric = metric;

View File

@@ -36,7 +36,6 @@ import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.ClusterTopologyProvider;
import org.springframework.data.redis.connection.ReactiveRedisClusterConnection;
@@ -83,7 +82,6 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti
* @throws IllegalArgumentException when {@code client} is {@literal null}.
* @since 2.0.1
*/
@SuppressWarnings("unchecked")
LettuceReactiveRedisClusterConnection(StatefulConnection<ByteBuffer, ByteBuffer> sharedConnection,
LettuceConnectionProvider connectionProvider, RedisClusterClient client) {
@@ -278,18 +276,17 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti
Assert.notNull(node, "Node must not be null");
Assert.notNull(mode, "AddSlots mode must not be null");
return execute(node, cmd -> {
return execute(node, commands -> {
RedisClusterNode nodeToUse = lookup(node);
String nodeId = nodeToUse.getId();
return switch (mode) {
case MIGRATING -> cmd.clusterSetSlotMigrating(slot, nodeId);
case IMPORTING -> cmd.clusterSetSlotImporting(slot, nodeId);
case NODE -> cmd.clusterSetSlotNode(slot, nodeId);
case STABLE -> cmd.clusterSetSlotStable(slot);
};
case MIGRATING -> commands.clusterSetSlotMigrating(slot, nodeId);
case IMPORTING -> commands.clusterSetSlotImporting(slot, nodeId);
case NODE -> commands.clusterSetSlotNode(slot, nodeId);
case STABLE -> commands.clusterSetSlotStable(slot);
};
}).then();
}
@@ -349,7 +346,7 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti
return getConnection().map(StatefulRedisClusterConnection::reactive);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@SuppressWarnings({ "unchecked" })
protected Mono<RedisReactiveCommands<ByteBuffer, ByteBuffer>> getCommands(RedisNode node) {
if (StringUtils.hasText(node.getId())) {

View File

@@ -17,13 +17,11 @@ package org.springframework.data.redis.connection.lettuce;
import io.lettuce.core.BitFieldArgs;
import io.lettuce.core.GetExArgs;
import io.lettuce.core.KeyValue;
import io.lettuce.core.SetArgs;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -38,13 +36,18 @@ import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiVa
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.RangeCommand;
import org.springframework.data.redis.connection.ReactiveStringCommands;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.util.Assert;
/**
* {@link ReactiveStringCommands} implemented using {@literal Lettuce}.
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Jiahe Cai
* @author Michele Mancioppi
* @author John Blum
* @since 2.0
*/
class LettuceReactiveStringCommands implements ReactiveStringCommands {
@@ -66,20 +69,20 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands {
@Override
public Flux<MultiValueResponse<List<ByteBuffer>, ByteBuffer>> mGet(Publisher<List<ByteBuffer>> keyCollections) {
return connection.execute(cmd -> Flux.from(keyCollections).concatMap((keys) -> {
return this.connection.execute(reactiveCommands -> Flux.from(keyCollections).concatMap((keys) -> {
Assert.notNull(keys, "Keys must not be null");
return cmd.mget(keys.toArray(new ByteBuffer[0])).collectList().map((value) -> {
return value.stream().map(keyValue -> keyValue.getValueOrElse(null)).collect(Collectors.toList());
}).map((values) -> new MultiValueResponse<>(keys, values));
return reactiveCommands.mget(keys.toArray(new ByteBuffer[0])).collectList()
.map(value -> value.stream().map(keyValue -> keyValue.getValueOrElse(null)).collect(Collectors.toList()))
.map(values -> new MultiValueResponse<>(keys, values));
}));
}
@Override
public Flux<BooleanResponse<SetCommand>> set(Publisher<SetCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap((command) -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap((command) -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getValue(), "Value must not be null");
@@ -87,12 +90,16 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands {
SetArgs args = null;
if (command.getExpiration().isPresent() || command.getOption().isPresent()) {
args = LettuceConverters.toSetArgs(command.getExpiration().isPresent() ? command.getExpiration().get() : null,
command.getOption().isPresent() ? command.getOption().get() : null);
Expiration expiration = command.getExpiration().orElse(null);
RedisStringCommands.SetOption setOption = command.getOption().orElse(null);
args = LettuceConverters.toSetArgs(expiration, setOption);
}
Mono<String> mono = args != null ? cmd.set(command.getKey(), command.getValue(), args)
: cmd.set(command.getKey(), command.getValue());
Mono<String> mono = args != null ? reactiveCommands.set(command.getKey(), command.getValue(), args)
: reactiveCommands.set(command.getKey(), command.getValue());
return mono.map(LettuceConverters::stringToBoolean).map(value -> new BooleanResponse<>(command, value))
.switchIfEmpty(Mono.just(new BooleanResponse<>(command, Boolean.FALSE)));
}));
@@ -101,16 +108,17 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands {
@Override
public Flux<ByteBufferResponse<SetCommand>> getSet(Publisher<SetCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap((command) -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap((command) -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getValue(), "Value must not be null");
if (command.getExpiration().isPresent() || command.getOption().isPresent()) {
throw new IllegalArgumentException("Command must not define expiration nor option for GETSET.");
throw new IllegalArgumentException("Command must not define expiration nor option for GETSET");
}
return cmd.getset(command.getKey(), command.getValue()).map((value) -> new ByteBufferResponse<>(command, value))
return reactiveCommands.getset(command.getKey(), command.getValue())
.map(value -> new ByteBufferResponse<>(command, value))
.defaultIfEmpty(new AbsentByteBufferResponse<>(command));
}));
}
@@ -118,11 +126,12 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands {
@Override
public Flux<ByteBufferResponse<KeyCommand>> get(Publisher<KeyCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap((command) -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap((command) -> {
Assert.notNull(command.getKey(), "Key must not be null");
return cmd.get(command.getKey()).map((value) -> new ByteBufferResponse<>(command, value))
return reactiveCommands.get(command.getKey())
.map(value -> new ByteBufferResponse<>(command, value))
.defaultIfEmpty(new AbsentByteBufferResponse<>(command));
}));
}
@@ -130,11 +139,12 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands {
@Override
public Flux<ByteBufferResponse<KeyCommand>> getDel(Publisher<KeyCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap((command) -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap((command) -> {
Assert.notNull(command.getKey(), "Key must not be null");
return cmd.getdel(command.getKey()).map((value) -> new ByteBufferResponse<>(command, value))
return reactiveCommands.getdel(command.getKey())
.map(value -> new ByteBufferResponse<>(command, value))
.defaultIfEmpty(new AbsentByteBufferResponse<>(command));
}));
}
@@ -142,13 +152,14 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands {
@Override
public Flux<ByteBufferResponse<GetExCommand>> getEx(Publisher<GetExCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap((command) -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap((command) -> {
Assert.notNull(command.getKey(), "Key must not be null");
GetExArgs args = LettuceConverters.toGetExArgs(command.getExpiration());
return cmd.getex(command.getKey(), args).map((value) -> new ByteBufferResponse<>(command, value))
return reactiveCommands.getex(command.getKey(), args)
.map(value -> new ByteBufferResponse<>(command, value))
.defaultIfEmpty(new AbsentByteBufferResponse<>(command));
}));
}
@@ -156,51 +167,59 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands {
@Override
public Flux<BooleanResponse<SetCommand>> setNX(Publisher<SetCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getValue(), "Value must not be null");
return cmd.setnx(command.getKey(), command.getValue()).map((value) -> new BooleanResponse<>(command, value));
return reactiveCommands.setnx(command.getKey(), command.getValue())
.map((value) -> new BooleanResponse<>(command, value));
}));
}
@Override
public Flux<BooleanResponse<SetCommand>> setEX(Publisher<SetCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getValue(), "Value must not be null");
Assert.isTrue(command.getExpiration().isPresent(), "Expiration time must not be null");
return cmd.setex(command.getKey(), command.getExpiration().get().getExpirationTimeInSeconds(), command.getValue())
.map(LettuceConverters::stringToBoolean).map((value) -> new BooleanResponse<>(command, value));
long expirationTimeInSeconds = command.getExpiration().get().getExpirationTimeInSeconds();
return reactiveCommands.setex(command.getKey(), expirationTimeInSeconds, command.getValue())
.map(LettuceConverters::stringToBoolean)
.map((value) -> new BooleanResponse<>(command, value));
}));
}
@Override
public Flux<BooleanResponse<SetCommand>> pSetEX(Publisher<SetCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getValue(), "Value must not be null");
Assert.isTrue(command.getExpiration().isPresent(), "Expiration time must not be null");
return cmd
.psetex(command.getKey(), command.getExpiration().get().getExpirationTimeInMilliseconds(), command.getValue())
.map(LettuceConverters::stringToBoolean).map((value) -> new BooleanResponse<>(command, value));
long expirationTimeInSeconds = command.getExpiration().get().getExpirationTimeInMilliseconds();
return reactiveCommands.psetex(command.getKey(), expirationTimeInSeconds, command.getValue())
.map(LettuceConverters::stringToBoolean)
.map((value) -> new BooleanResponse<>(command, value));
}));
}
@Override
public Flux<BooleanResponse<MSetCommand>> mSet(Publisher<MSetCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notEmpty(command.getKeyValuePairs(), "Pairs must not be null or empty");
return cmd.mset(command.getKeyValuePairs()).map(LettuceConverters::stringToBoolean)
return reactiveCommands.mset(command.getKeyValuePairs())
.map(LettuceConverters::stringToBoolean)
.map((value) -> new BooleanResponse<>(command, value));
}));
}
@@ -208,54 +227,56 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands {
@Override
public Flux<BooleanResponse<MSetCommand>> mSetNX(Publisher<MSetCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notEmpty(command.getKeyValuePairs(), "Pairs must not be null or empty");
return cmd.msetnx(command.getKeyValuePairs()).map((value) -> new BooleanResponse<>(command, value));
return reactiveCommands.msetnx(command.getKeyValuePairs())
.map((value) -> new BooleanResponse<>(command, value));
}));
}
@Override
public Flux<NumericResponse<AppendCommand, Long>> append(Publisher<AppendCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getValue(), "Value must not be null");
return cmd.append(command.getKey(), command.getValue()).map((value) -> new NumericResponse<>(command, value));
return reactiveCommands.append(command.getKey(), command.getValue())
.map(value -> new NumericResponse<>(command, value));
}));
}
@Override
public Flux<ByteBufferResponse<RangeCommand>> getRange(Publisher<RangeCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getRange(), "Range must not be null");
Range<Long> range = command.getRange();
Mono<ByteBuffer> result = cmd.getrange(command.getKey(), //
Mono<ByteBuffer> result = reactiveCommands.getrange(command.getKey(), //
LettuceConverters.getLowerBoundIndex(range), //
LettuceConverters.getUpperBoundIndex(range));
return result.map((value) -> new ByteBufferResponse<>(command, value));
return result.map(value -> new ByteBufferResponse<>(command, value));
}));
}
@Override
public Flux<NumericResponse<SetRangeCommand, Long>> setRange(Publisher<SetRangeCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getValue(), "Value must not be null");
Assert.notNull(command.getOffset(), "Offset must not be null");
return cmd.setrange(command.getKey(), command.getOffset(), command.getValue())
return reactiveCommands.setrange(command.getKey(), command.getOffset(), command.getValue())
.map((value) -> new NumericResponse<>(command, value));
}));
}
@@ -263,12 +284,13 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands {
@Override
public Flux<BooleanResponse<GetBitCommand>> getBit(Publisher<GetBitCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getOffset(), "Offset must not be null");
return cmd.getbit(command.getKey(), command.getOffset()).map(LettuceConverters::toBoolean)
return reactiveCommands.getbit(command.getKey(), command.getOffset())
.map(LettuceConverters::toBoolean)
.map(value -> new BooleanResponse<>(command, value));
}));
}
@@ -276,13 +298,12 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands {
@Override
public Flux<BooleanResponse<SetBitCommand>> setBit(Publisher<SetBitCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getValue(), "Value must not be null");
Assert.notNull(command.getOffset(), "Offset must not be null");
return cmd.setbit(command.getKey(), command.getOffset(), command.getValue() ? 1 : 0)
return reactiveCommands.setbit(command.getKey(), command.getOffset(), command.getValue() ? 1 : 0)
.map(LettuceConverters::toBoolean).map(respValue -> new BooleanResponse<>(command, respValue));
}));
}
@@ -290,54 +311,60 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands {
@Override
public Flux<NumericResponse<BitCountCommand, Long>> bitCount(Publisher<BitCountCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Range<Long> range = command.getRange();
return (!Range.unbounded().equals(range)
? cmd.bitcount(command.getKey(), LettuceConverters.getLowerBoundIndex(range), //
LettuceConverters.getUpperBoundIndex(range)) //
: cmd.bitcount(command.getKey())).map(responseValue -> new NumericResponse<>(command, responseValue));
Mono<Long> bitcount = !Range.unbounded().equals(range)
? reactiveCommands.bitcount(command.getKey(), LettuceConverters.getLowerBoundIndex(range),
LettuceConverters.getUpperBoundIndex(range))
: reactiveCommands.bitcount(command.getKey());
return bitcount.map(responseValue -> new NumericResponse<>(command, responseValue));
}));
}
@Override
public Flux<MultiValueResponse<BitFieldCommand, Long>> bitField(Publisher<BitFieldCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
BitFieldArgs args = LettuceConverters.toBitFieldArgs(command.getSubCommands());
return cmd.bitfield(command.getKey(), args).collectList().map(value -> new MultiValueResponse<>(command,
value.stream().map(v -> v.getValueOrElse(null)).collect(Collectors.toList())));
return reactiveCommands.bitfield(command.getKey(), args).collectList()
.map(value -> new MultiValueResponse<>(command, value.stream()
.map(v -> v.getValueOrElse(null))
.collect(Collectors.toList())));
}));
}
@Override
public Flux<NumericResponse<BitOpCommand, Long>> bitOp(Publisher<BitOpCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getDestinationKey(), "DestinationKey must not be null");
Assert.notEmpty(command.getKeys(), "Keys must not be null or empty");
Mono<Long> result = null;
ByteBuffer destinationKey = command.getDestinationKey();
ByteBuffer[] sourceKeys = command.getKeys().toArray(new ByteBuffer[0]);
Mono<Long> result;
result = switch (command.getBitOp()) {
case AND -> cmd.bitopAnd(destinationKey, sourceKeys);
case OR -> cmd.bitopOr(destinationKey, sourceKeys);
case XOR -> cmd.bitopXor(destinationKey, sourceKeys);
case AND -> reactiveCommands.bitopAnd(destinationKey, sourceKeys);
case OR -> reactiveCommands.bitopOr(destinationKey, sourceKeys);
case XOR -> reactiveCommands.bitopXor(destinationKey, sourceKeys);
case NOT -> {
Assert.isTrue(sourceKeys.length == 1, "BITOP NOT does not allow more than 1 source key.");
yield cmd.bitopNot(destinationKey, sourceKeys[0]);
yield reactiveCommands.bitopNot(destinationKey, sourceKeys[0]);
}
default -> throw new IllegalArgumentException(String.format("Unknown BITOP '%s'.", command.getBitOp()));
default -> throw new IllegalArgumentException(String.format("Unknown BITOP '%s'", command.getBitOp()));
};
return result.map(value -> new NumericResponse<>(command, value));
@@ -347,42 +374,36 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands {
@Override
public Flux<NumericResponse<BitPosCommand, Long>> bitPos(Publisher<BitPosCommand> commands) {
return connection.execute(cmd -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).flatMap(command -> {
return Flux.from(commands).flatMap(command -> {
Mono<Long> result;
Range<Long> range = command.getRange();
Mono<Long> result;
if (range.getLowerBound().isBounded()) {
result = cmd.bitpos(command.getKey(), command.getBit(), getLowerValue(range));
result = reactiveCommands.bitpos(command.getKey(), command.getBit(), getLowerValue(range));
if (range.getUpperBound().isBounded()) {
result = cmd.bitpos(command.getKey(), command.getBit(), getLowerValue(range), getUpperValue(range));
result = reactiveCommands.bitpos(command.getKey(), command.getBit(),
getLowerValue(range), getUpperValue(range));
}
} else {
result = cmd.bitpos(command.getKey(), command.getBit());
result = reactiveCommands.bitpos(command.getKey(), command.getBit());
}
return result.map(respValue -> new NumericResponse<>(command, respValue));
});
});
}));
}
@Override
public Flux<NumericResponse<KeyCommand, Long>> strLen(Publisher<KeyCommand> commands) {
return connection.execute(cmd -> {
return Flux.from(commands).concatMap(command -> {
return cmd.strlen(command.getKey()).map(respValue -> new NumericResponse<>(command, respValue));
});
});
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command ->
reactiveCommands.strlen(command.getKey()).map(respValue -> new NumericResponse<>(command, respValue))));
}
protected LettuceReactiveRedisConnection getConnection() {
return connection;
return this.connection;
}
private static <T extends Comparable<T>> T getUpperValue(Range<T> range) {

View File

@@ -15,6 +15,7 @@
*/
package org.springframework.data.redis.connection.lettuce;
import io.lettuce.core.KeyValue;
import io.lettuce.core.Limit;
import io.lettuce.core.Range;
import io.lettuce.core.ScanStream;
@@ -22,6 +23,7 @@ import io.lettuce.core.ScoredValue;
import io.lettuce.core.Value;
import io.lettuce.core.ZAddArgs;
import io.lettuce.core.ZStoreArgs;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -48,10 +50,13 @@ import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
/**
* {@link ReactiveZSetCommands} implementation for {@literal Lettuce}.
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Michele Mancioppi
* @author Andrey Shlykov
* @author John Blum
* @since 2.0
*/
class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
@@ -74,7 +79,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
@SuppressWarnings("unchecked")
public Flux<NumericResponse<ZAddCommand, Number>> zAdd(Publisher<ZAddCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notEmpty(command.getTuples(), "Tuples must not be empty or null");
@@ -91,7 +96,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
Tuple tuple = command.getTuples().iterator().next();
return cmd.zaddincr(command.getKey(), tuple.getScore(), ByteBuffer.wrap(tuple.getValue()))
return reactiveCommands.zaddincr(command.getKey(), tuple.getScore(), ByteBuffer.wrap(tuple.getValue()))
.map(value -> new NumericResponse<>(command, value));
}
@@ -117,7 +122,8 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
.map(tuple -> ScoredValue.fromNullable(tuple.getScore(), ByteBuffer.wrap(tuple.getValue())))
.toArray(ScoredValue[]::new);
Mono<Long> result = args == null ? cmd.zadd(command.getKey(), values) : cmd.zadd(command.getKey(), args, values);
Mono<Long> result = args == null ? reactiveCommands.zadd(command.getKey(), values)
: reactiveCommands.zadd(command.getKey(), args, values);
return result.map(value -> new NumericResponse<>(command, value));
}));
@@ -126,12 +132,14 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
@Override
public Flux<NumericResponse<ZRemCommand, Long>> zRem(Publisher<ZRemCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notEmpty(command.getValues(), "Values must not be null or empty");
return cmd.zrem(command.getKey(), command.getValues().stream().toArray(ByteBuffer[]::new))
ByteBuffer[] values = command.getValues().toArray(ByteBuffer[]::new);
return reactiveCommands.zrem(command.getKey(), values)
.map(value -> new NumericResponse<>(command, value));
}));
}
@@ -139,13 +147,13 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
@Override
public Flux<NumericResponse<ZIncrByCommand, Double>> zIncrBy(Publisher<ZIncrByCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getValue(), "Member must not be null");
Assert.notNull(command.getIncrement(), "Increment value must not be null");
return cmd.zincrby(command.getKey(), command.getIncrement().doubleValue(), command.getValue())
return reactiveCommands.zincrby(command.getKey(), command.getIncrement().doubleValue(), command.getValue())
.map(value -> new NumericResponse<>(command, value));
}));
}
@@ -154,11 +162,11 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
public Flux<CommandResponse<ZRandMemberCommand, Flux<ByteBuffer>>> zRandMember(
Publisher<ZRandMemberCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).map(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
return new CommandResponse<>(command, cmd.zrandmember(command.getKey(), command.getCount()));
return new CommandResponse<>(command, reactiveCommands.zrandmember(command.getKey(), command.getCount()));
}));
}
@@ -166,26 +174,28 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
public Flux<CommandResponse<ZRandMemberCommand, Flux<Tuple>>> zRandMemberWithScore(
Publisher<ZRandMemberCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).map(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
return new CommandResponse<>(command,
cmd.zrandmemberWithScores(command.getKey(), command.getCount()).map(this::toTuple));
Flux<ScoredValue<ByteBuffer>> result =
reactiveCommands.zrandmemberWithScores(command.getKey(), command.getCount());
return new CommandResponse<>(command, result.map(this::toTuple));
}));
}
@Override
public Flux<NumericResponse<ZRankCommand, Long>> zRank(Publisher<ZRankCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getValue(), "Value must not be null");
Mono<Long> result = ObjectUtils.nullSafeEquals(command.getDirection(), Direction.ASC)
? cmd.zrank(command.getKey(), command.getValue())
: cmd.zrevrank(command.getKey(), command.getValue());
? reactiveCommands.zrank(command.getKey(), command.getValue())
: reactiveCommands.zrevrank(command.getKey(), command.getValue());
return result.map(value -> new NumericResponse<>(command, value));
}));
@@ -194,7 +204,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
@Override
public Flux<CommandResponse<ZRangeCommand, Flux<Tuple>>> zRange(Publisher<ZRangeCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getRange(), "Range must not be null");
@@ -206,19 +216,15 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
if (ObjectUtils.nullSafeEquals(command.getDirection(), Direction.ASC)) {
if (command.isWithScores()) {
result = cmd.zrangeWithScores(command.getKey(), start, stop).map(this::toTuple);
result = reactiveCommands.zrangeWithScores(command.getKey(), start, stop).map(this::toTuple);
} else {
result = cmd.zrange(command.getKey(), start, stop).map(value -> toTuple(value, Double.NaN));
result = reactiveCommands.zrange(command.getKey(), start, stop).map(value -> toTuple(value, Double.NaN));
}
} else {
if (command.isWithScores()) {
result = cmd.zrevrangeWithScores(command.getKey(), start, stop).map(this::toTuple);
result = reactiveCommands.zrevrangeWithScores(command.getKey(), start, stop).map(this::toTuple);
} else {
result = cmd.zrevrange(command.getKey(), start, stop).map(value -> toTuple(value, Double.NaN));
result = reactiveCommands.zrevrange(command.getKey(), start, stop).map(value -> toTuple(value, Double.NaN));
}
}
@@ -230,7 +236,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
@SuppressWarnings("unchecked")
public Flux<CommandResponse<ZRangeStoreCommand, Mono<Long>>> zRangeStore(Publisher<ZRangeStoreCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Source key must not be null");
Assert.notNull(command.getDestKey(), "Destination key must not be null");
@@ -243,17 +249,17 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
if (command.getDirection() == Direction.ASC) {
switch (command.getRangeMode()) {
case ByScore -> result = cmd.zrangestorebyscore(command.getDestKey(), command.getKey(),
case ByScore -> result = reactiveCommands.zrangestorebyscore(command.getDestKey(), command.getKey(),
(Range<? extends Number>) LettuceConverters.toRange(command.getRange()), limit);
case ByLex -> result = cmd.zrangestorebylex(command.getDestKey(), command.getKey(),
case ByLex -> result = reactiveCommands.zrangestorebylex(command.getDestKey(), command.getKey(),
RangeConverter.toRange(command.getRange()), limit);
default -> throw new IllegalStateException("Unsupported value: " + command.getRangeMode());
}
} else {
switch (command.getRangeMode()) {
case ByScore -> result = cmd.zrevrangestorebyscore(command.getDestKey(), command.getKey(),
case ByScore -> result = reactiveCommands.zrevrangestorebyscore(command.getDestKey(), command.getKey(),
(Range<? extends Number>) LettuceConverters.toRange(command.getRange()), limit);
case ByLex -> result = cmd.zrevrangestorebylex(command.getDestKey(), command.getKey(),
case ByLex -> result = reactiveCommands.zrevrangestorebylex(command.getDestKey(), command.getKey(),
RangeConverter.toRange(command.getRange()), limit);
default -> throw new IllegalStateException("Unsupported value: " + command.getRangeMode());
}
@@ -267,7 +273,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
public Flux<CommandResponse<ZRangeByScoreCommand, Flux<Tuple>>> zRangeByScore(
Publisher<ZRangeByScoreCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getRange(), "Range must not be null");
@@ -281,21 +287,19 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
Range<? extends Number> range = RangeConverter.toRange(command.getRange());
if (command.isWithScores()) {
if (!isLimited) {
result = cmd.zrangebyscoreWithScores(command.getKey(), range).map(this::toTuple);
result = reactiveCommands.zrangebyscoreWithScores(command.getKey(), range).map(this::toTuple);
} else {
result = cmd
result = reactiveCommands
.zrangebyscoreWithScores(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get()))
.map(this::toTuple);
}
} else {
if (!isLimited) {
result = cmd.zrangebyscore(command.getKey(), range).map(value -> toTuple(value, Double.NaN));
result = reactiveCommands.zrangebyscore(command.getKey(), range).map(value -> toTuple(value, Double.NaN));
} else {
result = cmd.zrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get()))
result = reactiveCommands.zrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get()))
.map(value -> toTuple(value, Double.NaN));
}
}
@@ -304,21 +308,18 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
Range<? extends Number> range = RangeConverter.toRange(command.getRange());
if (command.isWithScores()) {
if (!isLimited) {
result = cmd.zrevrangebyscoreWithScores(command.getKey(), range).map(this::toTuple);
result = reactiveCommands.zrevrangebyscoreWithScores(command.getKey(), range).map(this::toTuple);
} else {
result = cmd.zrevrangebyscoreWithScores(command.getKey(), range,
result = reactiveCommands.zrevrangebyscoreWithScores(command.getKey(), range,
LettuceConverters.toLimit(command.getLimit().get())).map(this::toTuple);
}
} else {
if (!isLimited) {
result = cmd.zrevrangebyscore(command.getKey(), range).map(value -> toTuple(value, Double.NaN));
result = reactiveCommands.zrevrangebyscore(command.getKey(), range)
.map(value -> toTuple(value, Double.NaN));
} else {
result = cmd.zrevrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get()))
result = reactiveCommands.zrevrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get()))
.map(value -> toTuple(value, Double.NaN));
}
}
@@ -331,12 +332,12 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
@Override
public Flux<CommandResponse<KeyCommand, Flux<Tuple>>> zScan(Publisher<KeyScanCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getOptions(), "ScanOptions must not be null");
Flux<Tuple> result = ScanStream.zscan(cmd, command.getKey(), LettuceConverters.toScanArgs(command.getOptions()))
Flux<Tuple> result = ScanStream.zscan(reactiveCommands, command.getKey(), LettuceConverters.toScanArgs(command.getOptions()))
.map(this::toTuple);
return Mono.just(new CommandResponse<>(command, result));
@@ -346,13 +347,13 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
@Override
public Flux<NumericResponse<ZCountCommand, Long>> zCount(Publisher<ZCountCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getRange(), "Range must not be null");
Range<? extends Number> range = RangeConverter.toRange(command.getRange());
Mono<Long> result = cmd.zcount(command.getKey(), range);
Mono<Long> result = reactiveCommands.zcount(command.getKey(), range);
return result.map(value -> new NumericResponse<>(command, value));
}));
@@ -361,12 +362,12 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
@Override
public Flux<NumericResponse<ZLexCountCommand, Long>> zLexCount(Publisher<ZLexCountCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getRange(), "Range must not be null");
Mono<Long> result = cmd.zlexcount(command.getKey(), RangeConverter.toRange(command.getRange()));
Mono<Long> result = reactiveCommands.zlexcount(command.getKey(), RangeConverter.toRange(command.getRange()));
return result.map(value -> new NumericResponse<>(command, value));
}));
@@ -375,17 +376,20 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
@Override
public Flux<CommandResponse<ZPopCommand, Flux<Tuple>>> zPop(Publisher<ZPopCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).map(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Flux<ScoredValue<ByteBuffer>> result;
if (command.getCount() > 1) {
result = command.getDirection() == PopDirection.MIN ? cmd.zpopmin(command.getKey(), command.getCount())
: cmd.zpopmax(command.getKey(), command.getCount());
result = command.getDirection() == PopDirection.MIN
? reactiveCommands.zpopmin(command.getKey(), command.getCount())
: reactiveCommands.zpopmax(command.getKey(), command.getCount());
} else {
result = (command.getDirection() == PopDirection.MIN ? cmd.zpopmin(command.getKey())
: cmd.zpopmax(command.getKey())).flux();
result = (command.getDirection() == PopDirection.MIN
? reactiveCommands.zpopmin(command.getKey())
: reactiveCommands.zpopmax(command.getKey())).flux();
}
return new CommandResponse<>(command, result.filter(Value::hasValue).map(this::toTuple));
@@ -395,7 +399,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
@Override
public Flux<CommandResponse<BZPopCommand, Flux<Tuple>>> bZPop(Publisher<BZPopCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).map(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getTimeout(), "Timeout must not be null");
@@ -404,55 +408,63 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
double timeout = TimeoutUtils.toDoubleSeconds(command.getTimeout(), command.getTimeUnit());
Mono<ScoredValue<ByteBuffer>> result = (command.getDirection() == PopDirection.MIN
? cmd.bzpopmin(timeout, command.getKey())
: cmd.bzpopmax(timeout, command.getKey())).filter(Value::hasValue).map(Value::getValue);
Mono<KeyValue<ByteBuffer, ScoredValue<ByteBuffer>>> commandResult = command.getDirection() == PopDirection.MIN
? reactiveCommands.bzpopmin(timeout, command.getKey())
: reactiveCommands.bzpopmax(timeout, command.getKey());
Mono<ScoredValue<ByteBuffer>> result = commandResult.filter(Value::hasValue).map(Value::getValue);
return new CommandResponse<>(command, result.filter(Value::hasValue).map(this::toTuple).flux());
}
else {
long timeout = command.getTimeUnit().toSeconds(command.getTimeout());
long timeout = command.getTimeUnit().toSeconds(command.getTimeout());
Mono<ScoredValue<ByteBuffer>> result = (command.getDirection() == PopDirection.MIN
? cmd.bzpopmin(timeout, command.getKey())
: cmd.bzpopmax(timeout, command.getKey())).filter(Value::hasValue).map(Value::getValue);
Mono<KeyValue<ByteBuffer, ScoredValue<ByteBuffer>>> commandResult = command.getDirection() == PopDirection.MIN
? reactiveCommands.bzpopmin(timeout, command.getKey())
: reactiveCommands.bzpopmax(timeout, command.getKey());
return new CommandResponse<>(command, result.filter(Value::hasValue).map(this::toTuple).flux());
Mono<ScoredValue<ByteBuffer>> result = commandResult.filter(Value::hasValue).map(Value::getValue);
return new CommandResponse<>(command, result.filter(Value::hasValue).map(this::toTuple).flux());
}
}));
}
@Override
public Flux<NumericResponse<KeyCommand, Long>> zCard(Publisher<KeyCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
return cmd.zcard(command.getKey()).map(value -> new NumericResponse<>(command, value));
return reactiveCommands.zcard(command.getKey())
.map(value -> new NumericResponse<>(command, value));
}));
}
@Override
public Flux<NumericResponse<ZScoreCommand, Double>> zScore(Publisher<ZScoreCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getValue(), "Value must not be null");
return cmd.zscore(command.getKey(), command.getValue()).map(value -> new NumericResponse<>(command, value));
return reactiveCommands.zscore(command.getKey(), command.getValue())
.map(value -> new NumericResponse<>(command, value));
}));
}
@Override
public Flux<MultiValueResponse<ZMScoreCommand, Double>> zMScore(Publisher<ZMScoreCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getValues(), "Values must not be null");
return cmd.zmscore(command.getKey(), command.getValues().toArray(new ByteBuffer[0]))
return reactiveCommands.zmscore(command.getKey(), command.getValues().toArray(new ByteBuffer[0]))
.map(value -> new MultiValueResponse<>(command, value));
}));
}
@@ -461,12 +473,12 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
public Flux<NumericResponse<ZRemRangeByRankCommand, Long>> zRemRangeByRank(
Publisher<ZRemRangeByRankCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getRange(), "Range must not be null");
Mono<Long> result = cmd.zremrangebyrank(command.getKey(), //
Mono<Long> result = reactiveCommands.zremrangebyrank(command.getKey(), //
LettuceConverters.getLowerBoundIndex(command.getRange()), //
LettuceConverters.getUpperBoundIndex(command.getRange()));
@@ -478,13 +490,13 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
public Flux<NumericResponse<ZRemRangeByScoreCommand, Long>> zRemRangeByScore(
Publisher<ZRemRangeByScoreCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getRange(), "Range must not be null");
Range<? extends Number> range = RangeConverter.toRange(command.getRange());
Mono<Long> result = cmd.zremrangebyscore(command.getKey(), range);
Mono<Long> result = reactiveCommands.zremrangebyscore(command.getKey(), range);
return result.map(value -> new NumericResponse<>(command, value));
}));
@@ -493,12 +505,12 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
@Override
public Flux<NumericResponse<ZRemRangeByLexCommand, Long>> zRemRangeByLex(Publisher<ZRemRangeByLexCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getRange(), "Range must not be null");
Mono<Long> result = cmd.zremrangebylex(command.getKey(), RangeConverter.toRange(command.getRange()));
Mono<Long> result = reactiveCommands.zremrangebylex(command.getKey(), RangeConverter.toRange(command.getRange()));
return result.map(value -> new NumericResponse<>(command, value));
}));
@@ -507,37 +519,41 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
@Override
public Flux<CommandResponse<ZDiffCommand, Flux<ByteBuffer>>> zDiff(Publisher<? extends ZDiffCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).map(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> {
Assert.notEmpty(command.getKeys(), "Keys must not be null or empty");
ByteBuffer[] sourceKeys = command.getKeys().toArray(new ByteBuffer[0]);
return new CommandResponse<>(command, cmd.zdiff(sourceKeys));
return new CommandResponse<>(command, reactiveCommands.zdiff(sourceKeys));
}));
}
@Override
public Flux<CommandResponse<ZDiffCommand, Flux<Tuple>>> zDiffWithScores(Publisher<? extends ZDiffCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).map(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> {
Assert.notEmpty(command.getKeys(), "Keys must not be null or empty");
ByteBuffer[] sourceKeys = command.getKeys().toArray(new ByteBuffer[0]);
return new CommandResponse<>(command, cmd.zdiffWithScores(sourceKeys).map(this::toTuple));
return new CommandResponse<>(command, reactiveCommands.zdiffWithScores(sourceKeys).map(this::toTuple));
}));
}
@Override
public Flux<NumericResponse<ZDiffStoreCommand, Long>> zDiffStore(Publisher<ZDiffStoreCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Destination key must not be null");
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty");
ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(new ByteBuffer[0]);
return cmd.zdiffstore(command.getKey(), sourceKeys).map(value -> new NumericResponse<>(command, value));
return reactiveCommands.zdiffstore(command.getKey(), sourceKeys)
.map(value -> new NumericResponse<>(command, value));
}));
}
@@ -545,18 +561,23 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
public Flux<CommandResponse<ZAggregateCommand, Flux<ByteBuffer>>> zInter(
Publisher<? extends ZAggregateCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).map(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> {
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty");
ZStoreArgs args = null;
if (command.getAggregateFunction().isPresent() || !command.getWeights().isEmpty()) {
args = zStoreArgs(command.getAggregateFunction().isPresent() ? command.getAggregateFunction().get() : null,
command.getWeights());
}
ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(new ByteBuffer[0]);
Flux<ByteBuffer> result = args != null ? cmd.zinter(args, sourceKeys) : cmd.zinter(sourceKeys);
Flux<ByteBuffer> result = args != null
? reactiveCommands.zinter(args, sourceKeys)
: reactiveCommands.zinter(sourceKeys);
return new CommandResponse<>(command, result);
}));
}
@@ -565,19 +586,23 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
public Flux<CommandResponse<ZAggregateCommand, Flux<Tuple>>> zInterWithScores(
Publisher<? extends ZAggregateCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).map(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> {
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty");
ZStoreArgs args = null;
if (command.getAggregateFunction().isPresent() || !command.getWeights().isEmpty()) {
args = zStoreArgs(command.getAggregateFunction().isPresent() ? command.getAggregateFunction().get() : null,
command.getWeights());
}
ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(new ByteBuffer[0]);
Flux<ScoredValue<ByteBuffer>> result = args != null ? cmd.zinterWithScores(args, sourceKeys)
: cmd.zinterWithScores(sourceKeys);
Flux<ScoredValue<ByteBuffer>> result = args != null
? reactiveCommands.zinterWithScores(args, sourceKeys)
: reactiveCommands.zinterWithScores(sourceKeys);
return new CommandResponse<>(command, result.map(this::toTuple));
}));
}
@@ -586,20 +611,24 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
public Flux<NumericResponse<ZAggregateStoreCommand, Long>> zInterStore(
Publisher<? extends ZAggregateStoreCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Destination key must not be null");
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty");
ZStoreArgs args = null;
if (command.getAggregateFunction().isPresent() || !command.getWeights().isEmpty()) {
args = zStoreArgs(command.getAggregateFunction().isPresent() ? command.getAggregateFunction().get() : null,
command.getWeights());
}
ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(new ByteBuffer[0]);
Mono<Long> result = args != null ? cmd.zinterstore(command.getKey(), args, sourceKeys)
: cmd.zinterstore(command.getKey(), sourceKeys);
Mono<Long> result = args != null
? reactiveCommands.zinterstore(command.getKey(), args, sourceKeys)
: reactiveCommands.zinterstore(command.getKey(), sourceKeys);
return result.map(value -> new NumericResponse<>(command, value));
}));
}
@@ -608,18 +637,23 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
public Flux<CommandResponse<ZAggregateCommand, Flux<ByteBuffer>>> zUnion(
Publisher<? extends ZAggregateCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).map(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> {
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty");
ZStoreArgs args = null;
if (command.getAggregateFunction().isPresent() || !command.getWeights().isEmpty()) {
args = zStoreArgs(command.getAggregateFunction().isPresent() ? command.getAggregateFunction().get() : null,
command.getWeights());
}
ByteBuffer[] sourceKeys = command.getSourceKeys().stream().toArray(ByteBuffer[]::new);
Flux<ByteBuffer> result = args != null ? cmd.zunion(args, sourceKeys) : cmd.zunion(sourceKeys);
ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(ByteBuffer[]::new);
Flux<ByteBuffer> result = args != null
? reactiveCommands.zunion(args, sourceKeys)
: reactiveCommands.zunion(sourceKeys);
return new CommandResponse<>(command, result);
}));
}
@@ -628,19 +662,23 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
public Flux<CommandResponse<ZAggregateCommand, Flux<Tuple>>> zUnionWithScores(
Publisher<? extends ZAggregateCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).map(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> {
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty");
ZStoreArgs args = null;
if (command.getAggregateFunction().isPresent() || !command.getWeights().isEmpty()) {
args = zStoreArgs(command.getAggregateFunction().isPresent() ? command.getAggregateFunction().get() : null,
command.getWeights());
}
ByteBuffer[] sourceKeys = command.getSourceKeys().stream().toArray(ByteBuffer[]::new);
Flux<ScoredValue<ByteBuffer>> result = args != null ? cmd.zunionWithScores(args, sourceKeys)
: cmd.zunionWithScores(sourceKeys);
ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(ByteBuffer[]::new);
Flux<ScoredValue<ByteBuffer>> result = args != null
? reactiveCommands.zunionWithScores(args, sourceKeys)
: reactiveCommands.zunionWithScores(sourceKeys);
return new CommandResponse<>(command, result.map(this::toTuple));
}));
}
@@ -649,20 +687,24 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
public Flux<NumericResponse<ZAggregateStoreCommand, Long>> zUnionStore(
Publisher<? extends ZAggregateStoreCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Destination key must not be null");
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty");
ZStoreArgs args = null;
if (command.getAggregateFunction().isPresent() || !command.getWeights().isEmpty()) {
args = zStoreArgs(command.getAggregateFunction().isPresent() ? command.getAggregateFunction().get() : null,
command.getWeights());
}
ByteBuffer[] sourceKeys = command.getSourceKeys().stream().toArray(ByteBuffer[]::new);
Mono<Long> result = args != null ? cmd.zunionstore(command.getKey(), args, sourceKeys)
: cmd.zunionstore(command.getKey(), sourceKeys);
ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(ByteBuffer[]::new);
Mono<Long> result = args != null
? reactiveCommands.zunionstore(command.getKey(), args, sourceKeys)
: reactiveCommands.zunionstore(command.getKey(), sourceKeys);
return result.map(value -> new NumericResponse<>(command, value));
}));
}
@@ -671,26 +713,25 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
public Flux<CommandResponse<ZRangeByLexCommand, Flux<ByteBuffer>>> zRangeByLex(
Publisher<ZRangeByLexCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Destination key must not be null");
Flux<ByteBuffer> result;
if (!command.getLimit().isUnlimited()) {
if (ObjectUtils.nullSafeEquals(command.getDirection(), Direction.ASC)) {
result = cmd.zrangebylex(command.getKey(), RangeConverter.toRange(command.getRange()),
result = reactiveCommands.zrangebylex(command.getKey(), RangeConverter.toRange(command.getRange()),
LettuceConverters.toLimit(command.getLimit()));
} else {
result = cmd.zrevrangebylex(command.getKey(), RangeConverter.toRange(command.getRange()),
result = reactiveCommands.zrevrangebylex(command.getKey(), RangeConverter.toRange(command.getRange()),
LettuceConverters.toLimit(command.getLimit()));
}
} else {
if (ObjectUtils.nullSafeEquals(command.getDirection(), Direction.ASC)) {
result = cmd.zrangebylex(command.getKey(), RangeConverter.toRange(command.getRange()));
result = reactiveCommands.zrangebylex(command.getKey(), RangeConverter.toRange(command.getRange()));
} else {
result = cmd.zrevrangebylex(command.getKey(), RangeConverter.toRange(command.getRange()));
result = reactiveCommands.zrevrangebylex(command.getKey(), RangeConverter.toRange(command.getRange()));
}
}
@@ -701,6 +742,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
private static ZStoreArgs zStoreArgs(@Nullable Aggregate aggregate, @Nullable List<Double> weights) {
ZStoreArgs args = new ZStoreArgs();
if (aggregate != null) {
switch (aggregate) {
case MIN -> args.min();
@@ -725,6 +767,6 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands {
}
protected LettuceReactiveRedisConnection getConnection() {
return connection;
return this.connection;
}
}

View File

@@ -21,7 +21,6 @@ import io.lettuce.core.api.async.RedisStringAsyncCommands;
import java.util.List;
import java.util.Map;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.BitFieldSubCommands;
import org.springframework.data.redis.connection.RedisStringCommands;
@@ -31,9 +30,12 @@ import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* {@link RedisStringCommands} implementation for {@literal Lettuce}.
*
* @author Christoph Strobl
* @author Mark Paluch
* @author dengliming
* @author John Blum
* @since 2.0
*/
class LettuceStringCommands implements RedisStringCommands {
@@ -279,9 +281,8 @@ class LettuceStringCommands implements RedisStringCommands {
throw new IllegalArgumentException("Bitop NOT should only be performed against one key");
}
return connection.invoke().just(it -> {
return switch (op) {
return connection.invoke().just(it ->
switch (op) {
case AND -> it.bitopAnd(destination, keys);
case OR -> it.bitopOr(destination, keys);
case XOR -> it.bitopXor(destination, keys);
@@ -291,8 +292,7 @@ class LettuceStringCommands implements RedisStringCommands {
}
yield it.bitopNot(destination, keys[0]);
}
};
});
});
}
@Nullable
@@ -324,11 +324,13 @@ class LettuceStringCommands implements RedisStringCommands {
}
private static <T extends Comparable<T>> T getUpperValue(Range<T> range) {
return range.getUpperBound().getValue()
.orElseThrow(() -> new IllegalArgumentException("Range does not contain upper bound value"));
}
private static <T extends Comparable<T>> T getLowerValue(Range<T> range) {
return range.getLowerBound().getValue()
.orElseThrow(() -> new IllegalArgumentException("Range does not contain lower bound value"));
}

View File

@@ -43,10 +43,13 @@ import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* {@link RedisZSetCommands} implementation for {@literal Lettuce}.
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Andrey Shlykov
* @author Shyngys Sapraliyev
* @author John Blum
* @since 2.0
*/
class LettuceZSetCommands implements RedisZSetCommands {
@@ -532,10 +535,6 @@ class LettuceZSetCommands implements RedisZSetCommands {
/**
* @since 1.4
* @param key
* @param cursorId
* @param options
* @return
*/
public Cursor<Tuple> zScan(byte[] key, long cursorId, ScanOptions options) {
@@ -761,6 +760,7 @@ class LettuceZSetCommands implements RedisZSetCommands {
if (source.contains(Flag.CH)) {
target.ch();
}
return target;
}
}

View File

@@ -38,6 +38,7 @@ import org.springframework.util.ReflectionUtils;
* automatically by translating interface calls to actual {@code …Operations} interfaces.
*
* @author Mark Paluch
* @author John Blum
* @since 3.0
*/
class BoundOperationsProxyFactory {
@@ -54,7 +55,6 @@ class BoundOperationsProxyFactory {
* @param type the {@link DataType} for which to create a proxy object.
* @param operations the {@link RedisOperations} instance.
* @param operationsTargetFunction function to extract the actual delegate for method calls.
* @param <T>
* @return the proxy object.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -90,7 +90,7 @@ class BoundOperationsProxyFactory {
return targetMethodCache.computeIfAbsent(method, it -> {
Class[] paramTypes;
Class<?>[] paramTypes;
if (isStreamRead(method)) {
paramTypes = new Class[it.getParameterCount()];
@@ -144,13 +144,10 @@ class BoundOperationsProxyFactory {
yield null;
}
case "getOperations" -> delegate.getOps();
default -> method.getDeclaringClass() == boundOperationsInterface
? doInvoke(invocation, method, operationsTarget, true)
: doInvoke(invocation, method, delegate, false);
};
if (method.getDeclaringClass() == boundOperationsInterface) {
return doInvoke(invocation, method, operationsTarget, true);
}
return doInvoke(invocation, method, delegate, false);
}
@Nullable

View File

@@ -155,11 +155,13 @@ public abstract class RedisConnectionUtils {
// Use same RedisConnection for further Redis actions within the transaction.
// Thread-bound object will get removed by synchronization at transaction completion.
RedisConnectionHolder holderToUse = conHolder;
if (holderToUse == null) {
holderToUse = new RedisConnectionHolder(connection);
} else {
holderToUse.setConnection(connection);
}
holderToUse.requested();
// Consider callback-scope connection binding vs. transaction scope binding
@@ -193,25 +195,26 @@ public abstract class RedisConnectionUtils {
return factory.getConnection();
}
private static void potentiallyRegisterTransactionSynchronisation(RedisConnectionHolder connHolder,
private static void potentiallyRegisterTransactionSynchronisation(RedisConnectionHolder connectionHolder,
final RedisConnectionFactory factory) {
// Should go actually into RedisTransactionManager
if (!connHolder.isTransactionActive()) {
if (!connectionHolder.isTransactionActive()) {
connHolder.setTransactionActive(true);
connHolder.setSynchronizedWithTransaction(true);
connHolder.requested();
connectionHolder.setTransactionActive(true);
connectionHolder.setSynchronizedWithTransaction(true);
connectionHolder.requested();
RedisConnection conn = connHolder.getRequiredConnection();
RedisConnection connection = connectionHolder.getRequiredConnection();
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
if (!readOnly) {
conn.multi();
connection.multi();
}
TransactionSynchronizationManager
.registerSynchronization(new RedisTransactionSynchronizer(connHolder, conn, factory, readOnly));
.registerSynchronization(new RedisTransactionSynchronizer(connectionHolder, connection, factory, readOnly));
}
}
@@ -224,6 +227,7 @@ public abstract class RedisConnectionUtils {
RedisConnectionFactory factory) {
ProxyFactory proxyFactory = new ProxyFactory(connection);
proxyFactory.addAdvice(new ConnectionSplittingInterceptor(factory));
proxyFactory.addInterface(RedisConnectionProxy.class);
@@ -259,8 +263,8 @@ public abstract class RedisConnectionUtils {
// release transactional/read-only and non-transactional/non-bound connections.
// transactional connections for read-only transactions get no synchronizer registered
unbindConnection(factory);
return;
}
@@ -271,21 +275,21 @@ public abstract class RedisConnectionUtils {
* Determine whether the given two RedisConnections are equal, asking the target {@link RedisConnection} in case of a
* proxy. Used to detect equality even if the user passed in a raw target Connection while the held one is a proxy.
*
* @param conHolder the {@link RedisConnectionHolder} for the held Connection (potentially a proxy)
* @param passedInCon the {@link RedisConnection} passed-in by the user (potentially a target Connection without
* @param connectionHolder the {@link RedisConnectionHolder} for the held Connection (potentially a proxy)
* @param passedInConnetion the {@link RedisConnection} passed-in by the user (potentially a target Connection without
* proxy)
* @return whether the given Connections are equal
* @see #getTargetConnection
*/
private static boolean connectionEquals(RedisConnectionHolder conHolder, RedisConnection passedInCon) {
private static boolean connectionEquals(RedisConnectionHolder connectionHolder, RedisConnection passedInConnetion) {
if (!conHolder.hasConnection()) {
if (!connectionHolder.hasConnection()) {
return false;
}
RedisConnection heldCon = conHolder.getRequiredConnection();
RedisConnection heldConnection = connectionHolder.getRequiredConnection();
return heldCon.equals(passedInCon) || getTargetConnection(heldCon).equals(passedInCon);
return heldConnection.equals(passedInConnetion) || getTargetConnection(heldConnection).equals(passedInConnetion);
}
/**
@@ -293,19 +297,19 @@ public abstract class RedisConnectionUtils {
* {@link RedisConnection} is a proxy, it will be unwrapped until a non-proxy {@link RedisConnection} is found.
* Otherwise, the passed-in {@link RedisConnection} will be returned as-is.
*
* @param con the {@link RedisConnection} proxy to unwrap
* @param connection the {@link RedisConnection} proxy to unwrap
* @return the innermost target Connection, or the passed-in one if no proxy
* @see RedisConnectionProxy#getTargetConnection()
*/
private static RedisConnection getTargetConnection(RedisConnection con) {
private static RedisConnection getTargetConnection(RedisConnection connection) {
RedisConnection conToUse = con;
RedisConnection connectionToUse = connection;
while (conToUse instanceof RedisConnectionProxy) {
conToUse = ((RedisConnectionProxy) conToUse).getTargetConnection();
while (connectionToUse instanceof RedisConnectionProxy) {
connectionToUse = ((RedisConnectionProxy) connectionToUse).getTargetConnection();
}
return conToUse;
return connectionToUse;
}
/**
@@ -317,9 +321,10 @@ public abstract class RedisConnectionUtils {
*/
public static void unbindConnection(RedisConnectionFactory factory) {
RedisConnectionHolder conHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);
RedisConnectionHolder connectionHolder =
(RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);
if (conHolder == null) {
if (connectionHolder == null) {
return;
}
@@ -327,16 +332,17 @@ public abstract class RedisConnectionUtils {
log.debug("Unbinding Redis Connection");
}
if (conHolder.isTransactionActive()) {
if (connectionHolder.isTransactionActive()) {
if (log.isDebugEnabled()) {
log.debug("Redis Connection will be closed when outer transaction finished");
}
} else {
RedisConnection connection = conHolder.getConnection();
conHolder.released();
RedisConnection connection = connectionHolder.getConnection();
if (!conHolder.isOpen()) {
connectionHolder.released();
if (!connectionHolder.isOpen()) {
TransactionSynchronizationManager.unbindResourceIfPossible(factory);
@@ -349,18 +355,18 @@ public abstract class RedisConnectionUtils {
* Return whether the given Redis connection is transactional, that is, bound to the current thread by Spring's
* transaction facilities.
*
* @param conn Redis connection to check
* @param connFactory Redis connection factory that the connection was created with
* @param connection Redis connection to check
* @param connectionFactory Redis connection factory that the connection was created with
* @return whether the connection is transactional or not
*/
public static boolean isConnectionTransactional(RedisConnection conn, RedisConnectionFactory connFactory) {
public static boolean isConnectionTransactional(RedisConnection connection, RedisConnectionFactory connectionFactory) {
Assert.notNull(connFactory, "No RedisConnectionFactory specified");
Assert.notNull(connectionFactory, "No RedisConnectionFactory specified");
RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager
.getResource(connFactory);
RedisConnectionHolder connectionHolder = (RedisConnectionHolder) TransactionSynchronizationManager
.getResource(connectionFactory);
return connHolder != null && connectionEquals(connHolder, conn);
return connectionHolder != null && connectionEquals(connectionHolder, connection);
}
private static void doCloseConnection(@Nullable RedisConnection connection) {
@@ -392,15 +398,16 @@ public abstract class RedisConnectionUtils {
*/
private static class RedisTransactionSynchronizer implements TransactionSynchronization {
private final RedisConnectionHolder connHolder;
private final RedisConnectionHolder connectionHolder;
private final RedisConnection connection;
private final RedisConnectionFactory factory;
private final boolean readOnly;
RedisTransactionSynchronizer(RedisConnectionHolder connHolder, RedisConnection connection,
RedisTransactionSynchronizer(RedisConnectionHolder connectionHolder, RedisConnection connection,
RedisConnectionFactory factory, boolean readOnly) {
this.connHolder = connHolder;
this.connectionHolder = connectionHolder;
this.connection = connection;
this.factory = factory;
this.readOnly = readOnly;
@@ -423,10 +430,10 @@ public abstract class RedisConnectionUtils {
log.debug("Closing bound connection after transaction completed with " + status);
}
connHolder.setTransactionActive(false);
connectionHolder.setTransactionActive(false);
doCloseConnection(connection);
TransactionSynchronizationManager.unbindResource(factory);
connHolder.reset();
connectionHolder.reset();
}
}
}
@@ -527,12 +534,12 @@ public abstract class RedisConnectionUtils {
* Return whether this holder currently has a {@link RedisConnection}.
*/
protected boolean hasConnection() {
return (this.connection != null);
return this.connection != null;
}
@Nullable
public RedisConnection getConnection() {
return connection;
return this.connection;
}
public RedisConnection getRequiredConnection() {

View File

@@ -33,7 +33,6 @@ import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationListener;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.ConverterNotFoundException;
import org.springframework.data.keyvalue.core.AbstractKeyValueAdapter;
import org.springframework.data.keyvalue.core.KeyValueAdapter;
import org.springframework.data.mapping.PersistentPropertyAccessor;
@@ -99,6 +98,7 @@ import org.springframework.util.ObjectUtils;
* @author Christoph Strobl
* @author Mark Paluch
* @author Andrey Muchnik
* @author John Blum
* @since 1.7
*/
public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
@@ -158,6 +158,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
MappingRedisConverter mappingConverter = new MappingRedisConverter(mappingContext,
new PathIndexResolver(mappingContext), new ReferenceResolverImpl(redisOps));
mappingConverter.setCustomConversions(customConversions == null ? new RedisCustomConversions() : customConversions);
mappingConverter.afterPropertiesSet();
@@ -191,6 +192,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
public Object put(Object id, Object item, String keyspace) {
RedisData rdo = item instanceof RedisData ? (RedisData) item : new RedisData();
if (!(item instanceof RedisData)) {
converter.write(item, rdo);
}
@@ -229,7 +231,6 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
byte[] phantomKey = ByteUtils.concat(objectKey, BinaryKeyspaceIdentifier.PHANTOM_SUFFIX);
if (expires(rdo)) {
connection.del(phantomKey);
connection.hMSet(phantomKey, rdo.getBucket().rawMap());
connection.expire(phantomKey, rdo.getTimeToLive() + PHANTOM_KEY_TTL);
@@ -239,11 +240,13 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
}
IndexWriter indexWriter = new IndexWriter(connection, converter);
if (isNew) {
indexWriter.createIndexes(key, rdo.getIndexedData());
} else {
indexWriter.deleteAndUpdateIndexes(key, rdo.getIndexedData());
}
return null;
});
@@ -253,10 +256,9 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
@Override
public boolean contains(Object id, String keyspace) {
Boolean exists = redisOps
.execute((RedisCallback<Boolean>) connection -> connection.sIsMember(toBytes(keyspace), toBytes(id)));
RedisCallback<Boolean> command = connection -> connection.sIsMember(toBytes(keyspace), toBytes(id));
return exists != null ? exists : false;
return Boolean.TRUE.equals(this.redisOps.execute(command));
}
@Nullable
@@ -274,14 +276,16 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
byte[] binId = createKey(stringKeyspace, stringId);
Map<byte[], byte[]> raw = redisOps
.execute((RedisCallback<Map<byte[], byte[]>>) connection -> connection.hGetAll(binId));
RedisCallback<Map<byte[], byte[]>> command = connection -> connection.hGetAll(binId);
Map<byte[], byte[]> raw = redisOps.execute(command);
if (CollectionUtils.isEmpty(raw)) {
return null;
}
RedisData data = new RedisData(raw);
data.setId(stringId);
data.setKeyspace(stringKeyspace);
@@ -299,9 +303,9 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
byte[] binId = toBytes(id);
byte[] binKeyspace = toBytes(keyspace);
T o = get(id, keyspace, type);
T value = get(id, keyspace, type);
if (o != null) {
if (value != null) {
byte[] keyToDelete = createKey(asString(keyspace), asString(id));
@@ -314,9 +318,11 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
if (RedisKeyValueAdapter.this.keepShadowCopy()) {
RedisPersistentEntity<?> persistentEntity = converter.getMappingContext().getPersistentEntity(type);
if (persistentEntity != null && persistentEntity.isExpiring()) {
byte[] phantomKey = ByteUtils.concat(keyToDelete, BinaryKeyspaceIdentifier.PHANTOM_SUFFIX);
connection.del(phantomKey);
}
}
@@ -324,7 +330,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
});
}
return o;
return value;
}
@Override
@@ -344,7 +350,6 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
* @param type the desired target type.
* @param offset index value to start reading.
* @param rows maximum number or entities to return.
* @param <T>
* @return never {@literal null}.
* @since 2.5
*/
@@ -362,6 +367,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
}
offset = Math.max(0, offset);
if (rows > 0) {
keys = keys.subList((int) offset, Math.min((int) offset + rows, keys.size()));
}
@@ -456,6 +462,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
if (keepShadowCopy()) { // add phantom key so values can be restored
byte[] phantomKey = ByteUtils.concat(redisKey, BinaryKeyspaceIdentifier.PHANTOM_SUFFIX);
connection.hMSet(phantomKey, rdo.getBucket().rawMap());
connection.expire(phantomKey, rdo.getTimeToLive() + PHANTOM_KEY_TTL);
}
@@ -479,6 +486,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
RedisConnection connection) {
redisUpdateObject.addFieldToRemove(toBytes(path));
byte[] value = connection.hGet(redisUpdateObject.targetKey, toBytes(path));
if (value != null && value.length > 0) {
@@ -530,7 +538,6 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
*
* @param callback must not be {@literal null}.
* @see RedisOperations#execute(RedisCallback)
* @return
*/
@Nullable
public <T> T execute(RedisCallback<T> callback) {
@@ -551,7 +558,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
}
private String asString(Object value) {
return value instanceof String ? (String) value
return value instanceof String stringValue ? stringValue
: getConverter().getConversionService().convert(value, String.class);
}
@@ -561,10 +568,6 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
/**
* Convert given source to binary representation using the underlying {@link ConversionService}.
*
* @param source
* @return
* @throws ConverterNotFoundException
*/
public byte[] toBytes(Object source) {
@@ -577,13 +580,8 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
/**
* Read back and set {@link TimeToLive} for the property.
*
* @param key
* @param target
* @return
*/
@Nullable
@SuppressWarnings({ "unchecked", "rawtypes" })
private <T> T readBackTimeToLiveIfSet(@Nullable byte[] key, @Nullable T target) {
if (target == null || key == null) {
@@ -591,9 +589,11 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
}
RedisPersistentEntity<?> entity = this.converter.getMappingContext().getRequiredPersistentEntity(target.getClass());
if (entity.hasExplictTimeToLiveProperty()) {
RedisPersistentProperty ttlProperty = entity.getExplicitTimeToLiveProperty();
if (ttlProperty == null) {
return target;
}
@@ -635,7 +635,6 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
/**
* Configure usage of {@link KeyExpirationEventMessageListener}.
*
* @param enableKeyspaceEvents
* @since 1.8
*/
public void setEnableKeyspaceEvents(EnableKeyspaceEvents enableKeyspaceEvents) {
@@ -764,10 +763,6 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
/**
* Creates new {@link MappingExpirationListener}.
*
* @param listenerContainer
* @param ops
* @param converter
*/
MappingExpirationListener(RedisMessageListenerContainer listenerContainer, RedisOperations<?, ?> ops,
RedisConverter converter) {
@@ -791,23 +786,24 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
Map<byte[], byte[]> hash = ops.execute((RedisCallback<Map<byte[], byte[]>>) connection -> {
Map<byte[], byte[]> hash1 = connection.hGetAll(phantomKey);
Map<byte[], byte[]> phantomValue = connection.hGetAll(phantomKey);
if (!CollectionUtils.isEmpty(hash1)) {
if (!CollectionUtils.isEmpty(phantomValue)) {
connection.del(phantomKey);
}
return hash1;
return phantomValue;
});
Object value = CollectionUtils.isEmpty(hash) ? null : converter.read(Object.class, new RedisData(hash));
byte[] channelAsBytes = message.getChannel();
String channel = !ObjectUtils.isEmpty(channelAsBytes)
? converter.getConversionService().convert(channelAsBytes, String.class)
: null;
RedisKeyExpiredEvent event = new RedisKeyExpiredEvent(channel, key, value);
RedisKeyExpiredEvent<?> event = new RedisKeyExpiredEvent<>(channel, key, value);
ops.execute((RedisCallback<Void>) connection -> {
@@ -910,6 +906,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
}
static class Index {
final DataType type;
final byte[] key;
@@ -917,7 +914,6 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
this.key = key;
this.type = type;
}
}
}
}

View File

@@ -97,11 +97,13 @@ public abstract class TimeoutUtils {
}
private static long roundUpIfNecessary(long timeout, long convertedTimeout) {
// A 0 timeout blocks some Redis ops indefinitely, round up if that's
// not the intention
if (timeout > 0 && convertedTimeout == 0) {
return 1;
}
return convertedTimeout;
}
}

View File

@@ -32,10 +32,11 @@ import org.springframework.data.repository.query.parser.PartTree;
import org.springframework.util.CollectionUtils;
/**
* Redis specific query creator.
* {@link AbstractQueryCreator} implementation for Redis.
*
* @author Christoph Strobl
* @author Mark Paluch
* @author John Blum
* @since 1.7
*/
public class RedisQueryCreator extends AbstractQueryCreator<KeyValueQuery<RedisOperationChain>, RedisOperationChain> {
@@ -56,8 +57,10 @@ public class RedisQueryCreator extends AbstractQueryCreator<KeyValueQuery<RedisO
case TRUE -> sink.sismember(part.getProperty().toDotPath(), true);
case FALSE -> sink.sismember(part.getProperty().toDotPath(), false);
case WITHIN, NEAR -> sink.near(getNearPath(part, iterator));
default -> throw new IllegalArgumentException(
String.format("%s is not supported for Redis query derivation", part.getType()));
default -> {
String message = String.format("%s is not supported for Redis query derivation", part.getType());
throw new IllegalArgumentException(message);
}
}
return sink;
@@ -96,22 +99,21 @@ public class RedisQueryCreator extends AbstractQueryCreator<KeyValueQuery<RedisO
private NearPath getNearPath(Part part, Iterator<Object> iterator) {
Object o = iterator.next();
Object value = iterator.next();
Point point;
Distance distance;
if (o instanceof Circle) {
if (value instanceof Circle) {
point = ((Circle) value).getCenter();
distance = ((Circle) value).getRadius();
} else if (value instanceof Point) {
point = ((Circle) o).getCenter();
distance = ((Circle) o).getRadius();
} else if (o instanceof Point) {
point = (Point) o;
point = (Point) value;
if (!iterator.hasNext()) {
throw new InvalidDataAccessApiUsageException(
"Expected to find distance value for geo query; Are you missing a parameter");
String message = "Expected to find distance value for geo query; Are you missing a parameter";
throw new InvalidDataAccessApiUsageException(message);
}
Object distObject = iterator.next();
@@ -120,12 +122,18 @@ public class RedisQueryCreator extends AbstractQueryCreator<KeyValueQuery<RedisO
} else if (distObject instanceof Number) {
distance = new Distance(((Number) distObject).doubleValue(), Metrics.KILOMETERS);
} else {
throw new InvalidDataAccessApiUsageException(String
.format("Expected to find Distance or Numeric value for geo query but was %s", distObject.getClass()));
String message = String.format("Expected to find Distance or Numeric value for geo query but was %s",
distObject.getClass());
throw new InvalidDataAccessApiUsageException(message);
}
} else {
throw new InvalidDataAccessApiUsageException(
String.format("Expected to find a Circle or Point/Distance for geo query but was %s.", o.getClass()));
String message = String.format("Expected to find a Circle or Point/Distance for geo query but was %s.",
value.getClass());
throw new InvalidDataAccessApiUsageException(message);
}
return new NearPath(part.getProperty().toDotPath(), point, distance);