diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterConnection.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterConnection.java index bb36033aa..608034017 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterConnection.java @@ -70,6 +70,7 @@ import org.springframework.util.Assert; * @author Chen Guanqun * @author Pavel Khokhlov * @author Liming Deng + * @author John Blum * @since 1.7 */ public class JedisClusterConnection implements RedisClusterConnection { @@ -168,13 +169,15 @@ public class JedisClusterConnection implements RedisClusterConnection { Assert.notNull(command, "Command must not be null"); Assert.notNull(args, "Args must not be null"); - return clusterCommandExecutor.executeCommandOnArbitraryNode( - (JedisClusterCommandCallback) client -> client.sendCommand(JedisClientUtils.getCommand(command), args)) - .getValue(); + JedisClusterCommandCallback commandCallback = jedis -> + jedis.sendCommand(JedisClientUtils.getCommand(command), args); + + return this.clusterCommandExecutor.executeCommandOnArbitraryNode(commandCallback).getValue(); } @Nullable @Override + @SuppressWarnings("unchecked") public T execute(String command, byte[] key, Collection args) { Assert.notNull(command, "Command must not be null"); @@ -183,11 +186,12 @@ public class JedisClusterConnection implements RedisClusterConnection { byte[][] commandArgs = getCommandArguments(key, args); - RedisClusterNode keyMaster = topologyProvider.getTopology().getKeyServingMasterNode(key); + RedisClusterNode keyMaster = this.topologyProvider.getTopology().getKeyServingMasterNode(key); - return clusterCommandExecutor.executeCommandOnSingleNode((JedisClusterCommandCallback) client -> { - return (T) client.sendCommand(JedisClientUtils.getCommand(command), commandArgs); - }, keyMaster).getValue(); + JedisClusterCommandCallback commandCallback = jedis -> + (T) jedis.sendCommand(JedisClientUtils.getCommand(command), commandArgs); + + return this.clusterCommandExecutor.executeCommandOnSingleNode(commandCallback, keyMaster).getValue(); } private static byte[][] getCommandArguments(byte[] key, Collection args) { @@ -195,6 +199,7 @@ public class JedisClusterConnection implements RedisClusterConnection { byte[][] commandArgs = new byte[args.size() + 1][]; commandArgs[0] = key; + int targetIndex = 1; for (byte[] binaryArgument : args) { @@ -226,15 +231,17 @@ public class JedisClusterConnection implements RedisClusterConnection { * @since 2.1 */ @Nullable + @SuppressWarnings("unchecked") public List execute(String command, Collection keys, Collection args) { Assert.notNull(command, "Command must not be null"); Assert.notNull(keys, "Key must not be null"); Assert.notNull(args, "Args must not be null"); - return clusterCommandExecutor.executeMultiKeyCommand((JedisMultiKeyClusterCommandCallback) (client, key) -> { - return (T) client.sendCommand(JedisClientUtils.getCommand(command), getCommandArguments(key, args)); - }, keys).resultsAsList(); + JedisMultiKeyClusterCommandCallback commandCallback = (jedis, key) -> + (T) jedis.sendCommand(JedisClientUtils.getCommand(command), getCommandArguments(key, args)); + + return this.clusterCommandExecutor.executeMultiKeyCommand(commandCallback, keys).resultsAsList(); } @@ -345,18 +352,19 @@ public class JedisClusterConnection implements RedisClusterConnection { @Override public boolean isSubscribed() { - return (subscription != null && subscription.isAlive()); + return (this.subscription != null && this.subscription.isAlive()); } @Override public Subscription getSubscription() { - return subscription; + return this.subscription; } @Override public Long publish(byte[] channel, byte[] message) { + try { - return cluster.publish(channel, message); + return this.cluster.publish(channel, message); } catch (Exception ex) { throw convertJedisAccessException(ex); } @@ -366,15 +374,15 @@ public class JedisClusterConnection implements RedisClusterConnection { public void subscribe(MessageListener listener, byte[]... channels) { if (isSubscribed()) { - throw new RedisSubscribedConnectionException( - "Connection already subscribed; use the connection Subscription to cancel or add new channels"); + String message = "Connection already subscribed; use the connection Subscription to cancel or add new channels"; + throw new RedisSubscribedConnectionException(message); } try { JedisMessageListener jedisPubSub = new JedisMessageListener(listener); subscription = new JedisSubscription(listener, jedisPubSub, channels, null); cluster.subscribe(jedisPubSub, channels); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -382,15 +390,16 @@ public class JedisClusterConnection implements RedisClusterConnection { public void pSubscribe(MessageListener listener, byte[]... patterns) { if (isSubscribed()) { - throw new RedisSubscribedConnectionException( - "Connection already subscribed; use the connection Subscription to cancel or add new channels"); + String message = "Connection already subscribed; use the connection Subscription to cancel or add new channels"; + throw new RedisSubscribedConnectionException(message); } + try { JedisMessageListener jedisPubSub = new JedisMessageListener(listener); subscription = new JedisSubscription(listener, jedisPubSub, null, patterns); cluster.psubscribe(jedisPubSub, patterns); - } catch (Exception ex) { - throw convertJedisAccessException(ex); + } catch (Exception cause) { + throw convertJedisAccessException(cause); } } @@ -407,19 +416,20 @@ public class JedisClusterConnection implements RedisClusterConnection { throw new InvalidDataAccessApiUsageException("Echo not supported in cluster mode"); } - @Override + @Override @Nullable public String ping() { - return !clusterCommandExecutor.executeCommandOnAllNodes((JedisClusterCommandCallback) Jedis::ping) - .resultsAsList().isEmpty() ? "PONG" : null; + JedisClusterCommandCallback command = Jedis::ping; + return !this.clusterCommandExecutor.executeCommandOnAllNodes(command).resultsAsList().isEmpty() ? "PONG" : null; } @Override public String ping(RedisClusterNode node) { - return clusterCommandExecutor.executeCommandOnSingleNode((JedisClusterCommandCallback) Jedis::ping, node) - .getValue(); + JedisClusterCommandCallback command = Jedis::ping; + + return this.clusterCommandExecutor.executeCommandOnSingleNode(command, node).getValue(); } /* @@ -432,16 +442,17 @@ public class JedisClusterConnection implements RedisClusterConnection { Assert.notNull(node, "Node must not be null"); Assert.notNull(mode, "AddSlots mode must not be null"); - RedisClusterNode nodeToUse = topologyProvider.getTopology().lookup(node); + RedisClusterNode nodeToUse = this.topologyProvider.getTopology().lookup(node); String nodeId = nodeToUse.getId(); - clusterCommandExecutor.executeCommandOnSingleNode((JedisClusterCommandCallback) client -> switch (mode) { - case IMPORTING -> client.clusterSetSlotImporting(slot, nodeId); - case MIGRATING -> client.clusterSetSlotMigrating(slot, nodeId); - case STABLE -> client.clusterSetSlotStable(slot); - case NODE -> client.clusterSetSlotNode(slot, nodeId); - }, node); + JedisClusterCommandCallback command = jedis -> switch (mode) { + case IMPORTING -> jedis.clusterSetSlotImporting(slot, nodeId); + case MIGRATING -> jedis.clusterSetSlotMigrating(slot, nodeId); + case STABLE -> jedis.clusterSetSlotStable(slot); + case NODE -> jedis.clusterSetSlotNode(slot, nodeId); + }; + this.clusterCommandExecutor.executeCommandOnSingleNode(command, node); } @Override @@ -449,20 +460,24 @@ public class JedisClusterConnection implements RedisClusterConnection { RedisClusterNode node = clusterGetNodeForSlot(slot); - NodeResult> result = clusterCommandExecutor - .executeCommandOnSingleNode( - (JedisClusterCommandCallback>) client -> JedisConverters.stringListToByteList() - .convert(client.clusterGetKeysInSlot(slot, count != null ? count.intValue() : Integer.MAX_VALUE)), - node); + JedisClusterCommandCallback> command = jedis -> + JedisConverters.stringListToByteList().convert(jedis.clusterGetKeysInSlot(slot, nullSafeIntValue(count))); + + NodeResult> result = this.clusterCommandExecutor.executeCommandOnSingleNode(command, node); return result.getValue(); } + private int nullSafeIntValue(@Nullable Integer value) { + return value != null ? value : Integer.MAX_VALUE; + } + @Override public void clusterAddSlots(RedisClusterNode node, int... slots) { - clusterCommandExecutor.executeCommandOnSingleNode( - (JedisClusterCommandCallback) client -> client.clusterAddSlots(slots), node); + JedisClusterCommandCallback command = jedis -> jedis.clusterAddSlots(slots); + + this.clusterCommandExecutor.executeCommandOnSingleNode(command, node); } @Override @@ -478,16 +493,17 @@ public class JedisClusterConnection implements RedisClusterConnection { RedisClusterNode node = clusterGetNodeForSlot(slot); - return clusterCommandExecutor.executeCommandOnSingleNode( - (JedisClusterCommandCallback) client -> client.clusterCountKeysInSlot(slot), node).getValue(); + JedisClusterCommandCallback command = jedis -> jedis.clusterCountKeysInSlot(slot); + + return this.clusterCommandExecutor.executeCommandOnSingleNode(command, node).getValue(); } @Override public void clusterDeleteSlots(RedisClusterNode node, int... slots) { - clusterCommandExecutor.executeCommandOnSingleNode( - (JedisClusterCommandCallback) client -> client.clusterDelSlots(slots), node); + JedisClusterCommandCallback command = jedis -> jedis.clusterDelSlots(slots); + this.clusterCommandExecutor.executeCommandOnSingleNode(command, node); } @Override @@ -501,50 +517,54 @@ public class JedisClusterConnection implements RedisClusterConnection { @Override public void clusterForget(RedisClusterNode node) { - Set nodes = new LinkedHashSet<>(topologyProvider.getTopology().getActiveMasterNodes()); - RedisClusterNode nodeToRemove = topologyProvider.getTopology().lookup(node); + Set nodes = new LinkedHashSet<>(this.topologyProvider.getTopology().getActiveMasterNodes()); + RedisClusterNode nodeToRemove = this.topologyProvider.getTopology().lookup(node); + nodes.remove(nodeToRemove); - clusterCommandExecutor.executeCommandAsyncOnNodes( - (JedisClusterCommandCallback) client -> client.clusterForget(node.getId()), nodes); + JedisClusterCommandCallback command = jedis -> jedis.clusterForget(node.getId()); + + this.clusterCommandExecutor.executeCommandAsyncOnNodes(command, nodes); } @Override + @SuppressWarnings("all") public void clusterMeet(RedisClusterNode node) { Assert.notNull(node, "Cluster node must not be null for CLUSTER MEET command"); Assert.hasText(node.getHost(), "Node to meet cluster must have a host"); Assert.isTrue(node.getPort() > 0, "Node to meet cluster must have a port greater 0"); - clusterCommandExecutor.executeCommandOnAllNodes( - (JedisClusterCommandCallback) client -> client.clusterMeet(node.getHost(), node.getPort())); + JedisClusterCommandCallback command = jedis -> jedis.clusterMeet(node.getHost(), node.getPort()); + + this.clusterCommandExecutor.executeCommandOnAllNodes(command); } @Override public void clusterReplicate(RedisClusterNode master, RedisClusterNode replica) { - RedisClusterNode masterNode = topologyProvider.getTopology().lookup(master); + RedisClusterNode masterNode = this.topologyProvider.getTopology().lookup(master); - clusterCommandExecutor.executeCommandOnSingleNode( - (JedisClusterCommandCallback) client -> client.clusterReplicate(masterNode.getId()), replica); + JedisClusterCommandCallback command = jedis -> jedis.clusterReplicate(masterNode.getId()); + this.clusterCommandExecutor.executeCommandOnSingleNode(command, replica); } @Override public Integer clusterGetSlotForKey(byte[] key) { - return clusterCommandExecutor - .executeCommandOnArbitraryNode( - (JedisClusterCommandCallback) client -> (int) client.clusterKeySlot(JedisConverters.toString(key))) - .getValue(); + JedisClusterCommandCallback command = jedis -> + Long.valueOf(jedis.clusterKeySlot(JedisConverters.toString(key))).intValue(); + + return this.clusterCommandExecutor.executeCommandOnArbitraryNode(command).getValue(); } @Override public RedisClusterNode clusterGetNodeForKey(byte[] key) { - return topologyProvider.getTopology().getKeyServingMasterNode(key); + return this.topologyProvider.getTopology().getKeyServingMasterNode(key); } - @Override + @Override @Nullable public RedisClusterNode clusterGetNodeForSlot(int slot) { for (RedisClusterNode node : topologyProvider.getTopology().getSlotServingNodes(slot)) { @@ -558,7 +578,7 @@ public class JedisClusterConnection implements RedisClusterConnection { @Override public Set clusterGetNodes() { - return topologyProvider.getTopology().getNodes(); + return this.topologyProvider.getTopology().getNodes(); } @Override @@ -566,21 +586,26 @@ public class JedisClusterConnection implements RedisClusterConnection { Assert.notNull(master, "Master cannot be null"); - RedisClusterNode nodeToUse = topologyProvider.getTopology().lookup(master); + RedisClusterNode nodeToUse = this.topologyProvider.getTopology().lookup(master); - return JedisConverters.toSetOfRedisClusterNodes(clusterCommandExecutor - .executeCommandOnSingleNode( - (JedisClusterCommandCallback>) client -> client.clusterSlaves(nodeToUse.getId()), master) - .getValue()); + JedisClusterCommandCallback> command = jedis -> jedis.clusterSlaves(nodeToUse.getId()); + + List clusterNodes = this.clusterCommandExecutor.executeCommandOnSingleNode(command, master).getValue(); + + return JedisConverters.toSetOfRedisClusterNodes(clusterNodes); } @Override public Map> clusterGetMasterReplicaMap() { - List>> nodeResults = clusterCommandExecutor.executeCommandAsyncOnNodes( - (JedisClusterCommandCallback>) client -> JedisConverters - .toSetOfRedisClusterNodes(client.clusterSlaves(client.clusterMyId())), - topologyProvider.getTopology().getActiveMasterNodes()).getResults(); + JedisClusterCommandCallback> command = jedis -> + JedisConverters.toSetOfRedisClusterNodes(jedis.clusterSlaves(jedis.clusterMyId())); + + Set activeMasterNodes = this.topologyProvider.getTopology().getActiveMasterNodes(); + + List>> nodeResults = + this.clusterCommandExecutor.executeCommandAsyncOnNodes(command,activeMasterNodes) + .getResults(); Map> result = new LinkedHashMap<>(); @@ -594,19 +619,22 @@ public class JedisClusterConnection implements RedisClusterConnection { @Override public ClusterInfo clusterGetClusterInfo() { - return new ClusterInfo(JedisConverters.toProperties(clusterCommandExecutor - .executeCommandOnArbitraryNode((JedisClusterCommandCallback) Jedis::clusterInfo).getValue())); + JedisClusterCommandCallback command = Jedis::clusterInfo; + + String source = this.clusterCommandExecutor.executeCommandOnArbitraryNode(command).getValue(); + + return new ClusterInfo(JedisConverters.toProperties(source)); } /* - * --> Little helpers to make it work + * Little helpers to make it work */ - protected DataAccessException convertJedisAccessException(Exception ex) { + protected DataAccessException convertJedisAccessException(Exception cause) { - DataAccessException translated = EXCEPTION_TRANSLATION.translate(ex); + DataAccessException translated = EXCEPTION_TRANSLATION.translate(cause); - return translated != null ? translated : new RedisSystemException(ex.getMessage(), ex); + return translated != null ? translated : new RedisSystemException(cause.getMessage(), cause); } @Override @@ -615,8 +643,8 @@ public class JedisClusterConnection implements RedisClusterConnection { if (!closed && disposeClusterCommandExecutorOnClose) { try { clusterCommandExecutor.destroy(); - } catch (Exception ex) { - log.warn("Cannot properly close cluster command executor", ex); + } catch (Exception cause) { + log.warn("Cannot properly close cluster command executor", cause); } } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java index 3fdce40c9..88b8a5eb4 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java @@ -98,6 +98,7 @@ import org.springframework.util.StringUtils; * @author Ninad Divadkar * @author Guy Korland * @author dengliming + * @author John Blum */ @SuppressWarnings("ConstantConditions") abstract class JedisConverters extends Converters { @@ -122,7 +123,6 @@ abstract class JedisConverters extends Converters { /** * {@link ListConverter} converting jedis {@link redis.clients.jedis.resps.Tuple} to {@link Tuple}. * - * @return * @since 1.4 */ static ListConverter tuplesToTuples() { @@ -145,7 +145,6 @@ abstract class JedisConverters extends Converters { * Map a {@link Set} of {@link Tuple} by {@code value} to its {@code score}. * * @param tuples must not be {@literal null}. - * @return * @since 2.0 */ public static Map toTupleMap(Set tuples) { @@ -170,8 +169,6 @@ abstract class JedisConverters extends Converters { } /** - * @param source - * @return * @since 1.6 */ public static byte[] toBytes(Double source) { @@ -204,24 +201,23 @@ abstract class JedisConverters extends Converters { } /** - * @param source - * @return * @since 1.7 */ @SuppressWarnings("unchecked") public static RedisClusterNode toNode(Object source) { List values = (List) source; + RedisClusterNode.SlotRange range = new RedisClusterNode.SlotRange(((Number) values.get(0)).intValue(), ((Number) values.get(1)).intValue()); + List nodeInfo = (List) values.get(2); + return new RedisClusterNode(toString((byte[]) nodeInfo.get(0)), ((Number) nodeInfo.get(1)).intValue(), range); } /** - * @param source - * @return * @since 1.3 */ public static List toListOfRedisClientInformation(String source) { @@ -234,8 +230,6 @@ abstract class JedisConverters extends Converters { } /** - * @param source - * @return * @since 1.4 */ public static List toListOfRedisServer(List> source) { @@ -288,6 +282,7 @@ abstract class JedisConverters extends Converters { } public static BitOP toBitOp(BitOperation bitOp) { + return switch (bitOp) { case AND -> BitOP.AND; case OR -> BitOP.OR; @@ -297,12 +292,9 @@ abstract class JedisConverters extends Converters { } /** - * Converts a given {@link Bound} to its binary representation suitable for {@literal ZRANGEBY*} commands, despite - * {@literal ZRANGEBYLEX}. + * Converts a given {@link org.springframework.data.domain.Range.Bound} to its binary representation suitable for + * {@literal ZRANGEBY*} commands, despite {@literal ZRANGEBYLEX}. * - * @param boundary - * @param defaultValue - * @return * @since 1.6 */ public static byte[] boundaryToBytesForZRange(@Nullable org.springframework.data.domain.Range.Bound boundary, @@ -316,10 +308,9 @@ abstract class JedisConverters extends Converters { } /** - * Converts a given {@link Bound} to its binary representation suitable for ZRANGEBYLEX command. + * Converts a given {@link org.springframework.data.domain.Range.Bound} to its binary representation suitable for + * {@literal ZRANGEBYLEX} command. * - * @param boundary - * @return * @since 1.6 */ public static byte[] boundaryToBytesForZRangeByLex( @@ -342,7 +333,6 @@ abstract class JedisConverters extends Converters { * * * @param expiration must not be {@literal null}. - * @return * @since 2.2 */ public static SetParams toSetCommandExPxArgument(Expiration expiration) { @@ -359,8 +349,6 @@ abstract class JedisConverters extends Converters { * * * @param expiration must not be {@literal null}. - * @param params - * @return * @since 2.2 */ public static SetParams toSetCommandExPxArgument(Expiration expiration, SetParams params) { @@ -395,7 +383,6 @@ abstract class JedisConverters extends Converters { * * * @param expiration must not be {@literal null}. - * @return * @since 2.6 */ static GetExParams toGetExParams(Expiration expiration) { @@ -429,7 +416,6 @@ abstract class JedisConverters extends Converters { * * * @param option must not be {@literal null}. - * @return * @since 2.2 */ public static SetParams toSetCommandNxXxArgument(SetOption option) { @@ -448,8 +434,6 @@ abstract class JedisConverters extends Converters { * * * @param option must not be {@literal null}. - * @param params - * @return * @since 2.2 */ public static SetParams toSetCommandNxXxArgument(SetOption option, SetParams params) { @@ -492,9 +476,6 @@ abstract class JedisConverters extends Converters { /** * Convert {@link ScanOptions} to Jedis {@link ScanParams}. - * - * @param options - * @return */ public static ScanParams toScanParams(ScanOptions options) { @@ -523,8 +504,6 @@ abstract class JedisConverters extends Converters { } /** - * @param source - * @return * @since 1.8 */ public static List toStrings(List source) { @@ -547,7 +526,6 @@ abstract class JedisConverters extends Converters { } /** - * @return * @since 1.8 */ public static ListConverter geoCoordinateToPointConverter() { @@ -555,7 +533,6 @@ abstract class JedisConverters extends Converters { } /** - * @return * @since 2.5 */ @Nullable @@ -566,8 +543,6 @@ abstract class JedisConverters extends Converters { /** * Convert {@link Point} into {@link GeoCoordinate}. * - * @param source - * @return * @since 1.8 */ public static GeoCoordinate toGeoCoordinate(Point source) { @@ -577,8 +552,6 @@ abstract class JedisConverters extends Converters { /** * Get a {@link Converter} capable of converting {@link GeoRadiusResponse} into {@link GeoResults}. * - * @param metric - * @return * @since 1.8 */ public static Converter, GeoResults>> geoRadiusResponseToGeoResultsConverter( @@ -589,8 +562,6 @@ abstract class JedisConverters extends Converters { /** * Convert {@link Metric} into {@link GeoUnit}. * - * @param metric - * @return * @since 1.8 */ public static GeoUnit toGeoUnit(Metric metric) { @@ -640,8 +611,6 @@ abstract class JedisConverters extends Converters { /** * Convert {@link GeoRadiusCommandArgs} into {@link GeoRadiusParam}. * - * @param source - * @return * @since 1.8 */ public static GeoRadiusParam toGeoRadiusParam(GeoRadiusCommandArgs source) { @@ -677,9 +646,6 @@ abstract class JedisConverters extends Converters { /** * Convert a timeout to seconds using {@code double} representation including fraction of seconds. * - * @param timeout - * @param unit - * @return * @since 2.6 */ static double toSeconds(long timeout, TimeUnit unit) { @@ -697,7 +663,6 @@ abstract class JedisConverters extends Converters { /** * Convert given {@link BitFieldSubCommands} into argument array. * - * @param source * @return never {@literal null}. * @since 1.8 */ diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java index 2a3ba2c83..8e7e2a12b 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java @@ -62,6 +62,7 @@ import org.springframework.util.ObjectUtils; * * @author Christoph Strobl * @author Mark Paluch + * @author John Blum * @since 1.7 */ public class LettuceClusterConnection extends LettuceConnection @@ -70,7 +71,14 @@ public class LettuceClusterConnection extends LettuceConnection static final ExceptionTranslationStrategy exceptionConverter = new PassThroughExceptionTranslationStrategy( LettuceExceptionConverter.INSTANCE); + private boolean disposeClusterCommandExecutorOnClose; + + private ClusterCommandExecutor clusterCommandExecutor; + + private ClusterTopologyProvider topologyProvider; + private final Log log = LogFactory.getLog(getClass()); + private final LettuceClusterGeoCommands geoCommands = new LettuceClusterGeoCommands(this); private final LettuceClusterHashCommands hashCommands = new LettuceClusterHashCommands(this); private final LettuceClusterHyperLogLogCommands hllCommands = new LettuceClusterHyperLogLogCommands(this); @@ -81,10 +89,6 @@ public class LettuceClusterConnection extends LettuceConnection private final LettuceClusterZSetCommands zSetCommands = new LettuceClusterZSetCommands(this); private final LettuceClusterServerCommands serverCommands = new LettuceClusterServerCommands(this); - private ClusterCommandExecutor clusterCommandExecutor; - private ClusterTopologyProvider topologyProvider; - private boolean disposeClusterCommandExecutorOnClose; - /** * Creates new {@link LettuceClusterConnection} using {@link RedisClusterClient} with default * {@link RedisURI#DEFAULT_TIMEOUT_DURATION timeout} and a fresh {@link ClusterCommandExecutor} that gets destroyed on @@ -206,12 +210,14 @@ public class LettuceClusterConnection extends LettuceConnection LettuceConnectionProvider connectionProvider = getConnectionProvider(); - if (connectionProvider instanceof RedisClientProvider) { - return (RedisClusterClient) ((RedisClientProvider) getConnectionProvider()).getRedisClient(); + if (connectionProvider instanceof RedisClientProvider redisClientProvider) { + return (RedisClusterClient) redisClientProvider.getRedisClient(); } - throw new IllegalStateException(String.format("Connection provider %s does not implement RedisClientProvider", - connectionProvider.getClass().getName())); + String message = String.format("Connection provider %s does not implement RedisClientProvider", + connectionProvider.getClass().getName()); + + throw new IllegalStateException(message); } @Override @@ -266,8 +272,8 @@ public class LettuceClusterConnection extends LettuceConnection @Override public String ping() { - Collection ping = clusterCommandExecutor - .executeCommandOnAllNodes((LettuceClusterCommandCallback) BaseRedisCommands::ping).resultsAsList(); + + Collection ping = this.clusterCommandExecutor.executeCommandOnAllNodes(pingCommand()).resultsAsList(); for (String result : ping) { if (!ObjectUtils.nullSafeEquals("PONG", result)) { @@ -280,14 +286,16 @@ public class LettuceClusterConnection extends LettuceConnection @Override public String ping(RedisClusterNode node) { + return this.clusterCommandExecutor.executeCommandOnSingleNode(pingCommand(), node).getValue(); + } - return clusterCommandExecutor - .executeCommandOnSingleNode((LettuceClusterCommandCallback) BaseRedisCommands::ping, node).getValue(); + private LettuceClusterCommandCallback pingCommand() { + return BaseRedisCommands::ping; } @Override public List clusterGetNodes() { - return new ArrayList<>(topologyProvider.getTopology().getNodes()); + return new ArrayList<>(this.topologyProvider.getTopology().getNodes()); } @Override @@ -295,21 +303,24 @@ public class LettuceClusterConnection extends LettuceConnection Assert.notNull(master, "Master must not be null"); - RedisClusterNode nodeToUse = topologyProvider.getTopology().lookup(master); + RedisClusterNode nodeToUse = this.topologyProvider.getTopology().lookup(master); - return clusterCommandExecutor - .executeCommandOnSingleNode((LettuceClusterCommandCallback>) client -> LettuceConverters - .toSetOfRedisClusterNodes(client.clusterSlaves(nodeToUse.getId())), master) - .getValue(); + LettuceClusterCommandCallback> command = client -> + LettuceConverters.toSetOfRedisClusterNodes(client.clusterSlaves(nodeToUse.getId())); + + return this.clusterCommandExecutor.executeCommandOnSingleNode(command, master).getValue(); } @Override public Map> clusterGetMasterReplicaMap() { - List>> nodeResults = clusterCommandExecutor.executeCommandAsyncOnNodes( - (LettuceClusterCommandCallback>) client -> Converters - .toSetOfRedisClusterNodes(client.clusterSlaves(client.clusterMyId())), - topologyProvider.getTopology().getActiveMasterNodes()).getResults(); + Set activeMasterNodes = this.topologyProvider.getTopology().getActiveMasterNodes(); + + LettuceClusterCommandCallback> command = client -> + Converters.toSetOfRedisClusterNodes(client.clusterSlaves(client.clusterMyId())); + + List>> nodeResults = + this.clusterCommandExecutor.executeCommandAsyncOnNodes(command,activeMasterNodes).getResults(); Map> result = new LinkedHashMap<>(); @@ -325,14 +336,13 @@ public class LettuceClusterConnection extends LettuceConnection return SlotHash.getSlot(key); } + @Nullable @Override public RedisClusterNode clusterGetNodeForSlot(int slot) { Set nodes = topologyProvider.getTopology().getSlotServingNodes(slot); - if (nodes.isEmpty()) { - return null; - } - return nodes.iterator().next(); + + return !nodes.isEmpty() ? nodes.iterator().next() : null; } @Override @@ -343,17 +353,18 @@ public class LettuceClusterConnection extends LettuceConnection @Override public ClusterInfo clusterGetClusterInfo() { - return clusterCommandExecutor - .executeCommandOnArbitraryNode((LettuceClusterCommandCallback) client -> new ClusterInfo( - LettuceConverters.toProperties(client.clusterInfo()))) - .getValue(); + LettuceClusterCommandCallback command = client -> + new ClusterInfo(LettuceConverters.toProperties(client.clusterInfo())); + + return this.clusterCommandExecutor.executeCommandOnArbitraryNode(command).getValue(); } @Override public void clusterAddSlots(RedisClusterNode node, int... slots) { - clusterCommandExecutor.executeCommandOnSingleNode( - (LettuceClusterCommandCallback) client -> client.clusterAddSlots(slots), node); + LettuceClusterCommandCallback command = client -> client.clusterAddSlots(slots); + + this.clusterCommandExecutor.executeCommandOnSingleNode(command, node); } @Override @@ -369,15 +380,17 @@ public class LettuceClusterConnection extends LettuceConnection try { return getConnection().clusterCountKeysInSlot(slot); - } catch (Exception ex) { - throw exceptionConverter.translate(ex); + } catch (Exception cause) { + throw this.exceptionConverter.translate(cause); } } @Override public void clusterDeleteSlots(RedisClusterNode node, int... slots) { - clusterCommandExecutor.executeCommandOnSingleNode( - (LettuceClusterCommandCallback) client -> client.clusterDelSlots(slots), node); + + LettuceClusterCommandCallback command = client -> client.clusterDelSlots(slots); + + this.clusterCommandExecutor.executeCommandOnSingleNode(command, node); } @Override @@ -393,21 +406,25 @@ public class LettuceClusterConnection extends LettuceConnection List nodes = new ArrayList<>(clusterGetNodes()); RedisClusterNode nodeToRemove = topologyProvider.getTopology().lookup(node); + nodes.remove(nodeToRemove); - this.clusterCommandExecutor.executeCommandAsyncOnNodes( - (LettuceClusterCommandCallback) client -> client.clusterForget(nodeToRemove.getId()), nodes); + LettuceClusterCommandCallback command = client -> client.clusterForget(nodeToRemove.getId()); + + this.clusterCommandExecutor.executeCommandAsyncOnNodes(command, nodes); } @Override + @SuppressWarnings("all") public void clusterMeet(RedisClusterNode node) { Assert.notNull(node, "Cluster node must not be null for CLUSTER MEET command"); Assert.hasText(node.getHost(), "Node to meet cluster must have a host"); Assert.isTrue(node.getPort() > 0, "Node to meet cluster must have a port greater 0"); - this.clusterCommandExecutor.executeCommandOnAllNodes( - (LettuceClusterCommandCallback) client -> client.clusterMeet(node.getHost(), node.getPort())); + LettuceClusterCommandCallback command = client -> client.clusterMeet(node.getHost(), node.getPort()); + + this.clusterCommandExecutor.executeCommandOnAllNodes(command); } @Override @@ -419,12 +436,14 @@ public class LettuceClusterConnection extends LettuceConnection RedisClusterNode nodeToUse = topologyProvider.getTopology().lookup(node); String nodeId = nodeToUse.getId(); - clusterCommandExecutor.executeCommandOnSingleNode((LettuceClusterCommandCallback) client -> switch (mode) { + LettuceClusterCommandCallback command = client -> switch (mode) { case MIGRATING -> client.clusterSetSlotMigrating(slot, nodeId); case IMPORTING -> client.clusterSetSlotImporting(slot, nodeId); case NODE -> client.clusterSetSlotNode(slot, nodeId); case STABLE -> client.clusterSetSlotStable(slot); - }, node); + }; + + this.clusterCommandExecutor.executeCommandOnSingleNode(command, node); } @Override @@ -432,17 +451,19 @@ public class LettuceClusterConnection extends LettuceConnection try { return getConnection().clusterGetKeysInSlot(slot, count); - } catch (Exception ex) { - throw exceptionConverter.translate(ex); + } catch (Exception cause) { + throw this.exceptionConverter.translate(cause); } } @Override public void clusterReplicate(RedisClusterNode master, RedisClusterNode replica) { - RedisClusterNode masterNode = topologyProvider.getTopology().lookup(master); - clusterCommandExecutor.executeCommandOnSingleNode( - (LettuceClusterCommandCallback) client -> client.clusterReplicate(masterNode.getId()), replica); + RedisClusterNode masterNode = this.topologyProvider.getTopology().lookup(master); + + LettuceClusterCommandCallback command = client -> client.clusterReplicate(masterNode.getId()); + + this.clusterCommandExecutor.executeCommandOnSingleNode(command, replica); } @Override @@ -485,7 +506,7 @@ public class LettuceClusterConnection extends LettuceConnection } public ClusterCommandExecutor getClusterCommandExecutor() { - return clusterCommandExecutor; + return this.clusterCommandExecutor; } @Override @@ -543,25 +564,24 @@ public class LettuceClusterConnection extends LettuceConnection Assert.notNull(node, "Node must not be null"); - if (connection == null) { + if (this.connection == null) { synchronized (this) { - if (connection == null) { - this.connection = connectionProvider.getConnection(StatefulRedisClusterConnection.class); + if (this.connection == null) { + this.connection = this.connectionProvider.getConnection(StatefulRedisClusterConnection.class); } } } - return connection.getConnection(node.getHost(), node.getPort()).sync(); + return this.connection.getConnection(node.getHost(), node.getPort()).sync(); } @Override - @SuppressWarnings("unchecked") public void returnResourceForSpecificNode(RedisClusterNode node, Object resource) {} @Override public void destroy() throws Exception { - if (connection != null) { - connectionProvider.release(connection); + if (this.connection != null) { + this.connectionProvider.release(this.connection); } } } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConverters.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConverters.java index cb5ba59ba..899099559 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConverters.java @@ -37,7 +37,6 @@ import org.springframework.data.geo.Point; import org.springframework.data.redis.connection.*; import org.springframework.data.redis.connection.BitFieldSubCommands.BitFieldGet; import org.springframework.data.redis.connection.BitFieldSubCommands.BitFieldIncrBy; -import org.springframework.data.redis.connection.BitFieldSubCommands.BitFieldIncrBy.Overflow; import org.springframework.data.redis.connection.BitFieldSubCommands.BitFieldSet; import org.springframework.data.redis.connection.BitFieldSubCommands.BitFieldSubCommand; import org.springframework.data.redis.connection.Limit; @@ -79,6 +78,7 @@ import org.springframework.util.StringUtils; * @author dengliming * @author Chris Bono * @author Vikas Garg + * @author John Blum */ @SuppressWarnings("ConstantConditions") public abstract class LettuceConverters extends Converters { @@ -92,7 +92,6 @@ public abstract class LettuceConverters extends Converters { private static final long INDEXED_RANGE_END = -1; static { - PLUS_BYTES = toBytes("+"); MINUS_BYTES = toBytes("-"); POSITIVE_INFINITY_BYTES = toBytes("+inf"); @@ -100,7 +99,9 @@ public abstract class LettuceConverters extends Converters { } public static Point geoCoordinatesToPoint(@Nullable GeoCoordinates geoCoordinate) { - return geoCoordinate != null ? new Point(geoCoordinate.getX().doubleValue(), geoCoordinate.getY().doubleValue()) + + return geoCoordinate != null + ? new Point(geoCoordinate.getX().doubleValue(), geoCoordinate.getY().doubleValue()) : null; } @@ -109,15 +110,19 @@ public abstract class LettuceConverters extends Converters { } public static Converter>, List> scoredValuesToTupleList() { + return source -> { if (source == null) { return null; } + List tuples = new ArrayList<>(source.size()); + for (ScoredValue value : source) { tuples.add(LettuceConverters.toTuple(value)); } + return tuples; }; } @@ -127,8 +132,7 @@ public abstract class LettuceConverters extends Converters { } /** - * @return - * @sice 1.3 + * @since 1.3 */ public static Converter longToBooleanConverter() { return Converters::toBoolean; @@ -147,7 +151,9 @@ public abstract class LettuceConverters extends Converters { if (source == null) { return null; } + List list = new ArrayList<>(2); + list.add(source.getKey()); list.add(source.getValue()); @@ -155,56 +161,63 @@ public abstract class LettuceConverters extends Converters { } public static List toBytesList(Collection source) { + if (source instanceof List) { return (List) source; } + return source != null ? new ArrayList<>(source) : null; } public static Tuple toTuple(@Nullable ScoredValue source) { - return source != null && source.hasValue() ? new DefaultTuple(source.getValue(), Double.valueOf(source.getScore())) + + return source != null && source.hasValue() + ? new DefaultTuple(source.getValue(), Double.valueOf(source.getScore())) : null; } public static String toString(@Nullable byte[] source) { + if (source == null || Arrays.equals(source, new byte[0])) { return null; } + return new String(source); } public static ScriptOutputType toScriptOutputType(ReturnType returnType) { + Assert.notNull(returnType, () -> "Return type " + returnType + " is not a supported script output type"); + return switch (returnType) { case BOOLEAN -> ScriptOutputType.BOOLEAN; case MULTI -> ScriptOutputType.MULTI; case VALUE -> ScriptOutputType.VALUE; case INTEGER -> ScriptOutputType.INTEGER; case STATUS -> ScriptOutputType.STATUS; - default -> - throw new IllegalArgumentException("Return type " + returnType + " is not a supported script output type"); }; } public static boolean toBoolean(Position where) { Assert.notNull(where, "list positions are mandatory"); - return (Position.AFTER.equals(where) ? false : true); + return !Position.AFTER.equals(where); } public static int toInt(boolean value) { - return (value ? 1 : 0); + return value ? 1 : 0; } public static Map toMap(List source) { + if (CollectionUtils.isEmpty(source)) { return Collections.emptyMap(); } Map target = new LinkedHashMap<>(); + Iterator keyValue = source.iterator(); - Iterator kv = source.iterator(); - while (kv.hasNext()) { - target.put(kv.next(), kv.hasNext() ? kv.next() : null); + while (keyValue.hasNext()) { + target.put(keyValue.next(), keyValue.hasNext() ? keyValue.next() : null); } return target; @@ -213,6 +226,7 @@ public abstract class LettuceConverters extends Converters { public static SortArgs toSortArgs(SortParameters params) { SortArgs args = new SortArgs(); + if (params == null) { return args; } @@ -235,10 +249,13 @@ public abstract class LettuceConverters extends Converters { args.desc(); } } + Boolean isAlpha = params.isAlphabetic(); + if (isAlpha != null && isAlpha) { args.alpha(); } + return args; } @@ -254,7 +271,6 @@ public abstract class LettuceConverters extends Converters { /** * Convert a {@link Limit} to a Lettuce {@link io.lettuce.core.Limit}. * - * @param limit * @return a lettuce {@link io.lettuce.core.Limit}. * @since 2.0 */ @@ -266,8 +282,6 @@ public abstract class LettuceConverters extends Converters { /** * Convert a {@link org.springframework.data.redis.connection.RedisZSetCommands.Range} to a lettuce {@link Range}. * - * @param range - * @return * @since 2.0 */ public static Range toRange(org.springframework.data.domain.Range range) { @@ -277,9 +291,6 @@ public abstract class LettuceConverters extends Converters { /** * Convert a {@link org.springframework.data.domain.Range} to a lettuce {@link Range}. * - * @param range - * @param convertNumberToBytes - * @return * @since 2.2 */ @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -287,6 +298,7 @@ public abstract class LettuceConverters extends Converters { Range.Boundary upper = RangeConverter.convertBound(range.getUpperBound(), convertNumberToBytes, null, it -> it.getBytes(StandardCharsets.UTF_8)); + Range.Boundary lower = RangeConverter.convertBound(range.getLowerBound(), convertNumberToBytes, null, it -> it.getBytes(StandardCharsets.UTF_8)); @@ -296,8 +308,6 @@ public abstract class LettuceConverters extends Converters { /** * Convert a {@link org.springframework.data.domain.Range} to a lettuce {@link Range} and reverse boundaries. * - * @param range - * @return * @since 2.0 */ @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -305,6 +315,7 @@ public abstract class LettuceConverters extends Converters { Range.Boundary upper = RangeConverter.convertBound(range.getUpperBound(), false, null, it -> it.getBytes(StandardCharsets.UTF_8)); + Range.Boundary lower = RangeConverter.convertBound(range.getLowerBound(), false, null, it -> it.getBytes(StandardCharsets.UTF_8)); @@ -324,9 +335,11 @@ public abstract class LettuceConverters extends Converters { } List sentinels = new ArrayList<>(); + for (Map info : source) { sentinels.add(RedisServer.newServerFrom(Converters.toProperties(info))); } + return sentinels; } @@ -343,6 +356,7 @@ public abstract class LettuceConverters extends Converters { Set sentinels = sentinelConfiguration.getSentinels(); RedisPassword sentinelPassword = sentinelConfiguration.getSentinelPassword(); RedisURI.Builder builder = RedisURI.builder(); + for (RedisNode sentinel : sentinels) { RedisURI.Builder sentinelBuilder = RedisURI.Builder.redis(sentinel.getHost(), sentinel.getPort()); @@ -383,6 +397,7 @@ public abstract class LettuceConverters extends Converters { static RedisStandaloneConfiguration createRedisStandaloneConfiguration(RedisURI redisURI) { RedisStandaloneConfiguration standaloneConfiguration = new RedisStandaloneConfiguration(); + standaloneConfiguration.setHostName(redisURI.getHost()); standaloneConfiguration.setPort(redisURI.getPort()); standaloneConfiguration.setDatabase(redisURI.getDatabase()); @@ -402,6 +417,7 @@ public abstract class LettuceConverters extends Converters { static RedisSocketConfiguration createRedisSocketConfiguration(RedisURI redisURI) { RedisSocketConfiguration socketConfiguration = new RedisSocketConfiguration(); + socketConfiguration.setSocket(redisURI.getSocket()); socketConfiguration.setDatabase(redisURI.getDatabase()); @@ -420,16 +436,21 @@ public abstract class LettuceConverters extends Converters { static RedisSentinelConfiguration createRedisSentinelConfiguration(RedisURI redisURI) { RedisSentinelConfiguration sentinelConfiguration = new RedisSentinelConfiguration(); + if (!ObjectUtils.isEmpty(redisURI.getSentinelMasterId())) { sentinelConfiguration.setMaster(redisURI.getSentinelMasterId()); } + sentinelConfiguration.setDatabase(redisURI.getDatabase()); for (RedisURI sentinelNodeRedisUri : redisURI.getSentinels()) { + RedisNode sentinelNode = new RedisNode(sentinelNodeRedisUri.getHost(), sentinelNodeRedisUri.getPort()); + if (sentinelNodeRedisUri.getPassword() != null) { sentinelConfiguration.setSentinelPassword(sentinelNodeRedisUri.getPassword()); } + sentinelConfiguration.addSentinel(sentinelNode); } @@ -450,10 +471,7 @@ public abstract class LettuceConverters extends Converters { } public static byte[] toBytes(@Nullable String source) { - if (source == null) { - return null; - } - return source.getBytes(); + return source != null ? source.getBytes() : null; } public static byte[] toBytes(Integer source) { @@ -465,8 +483,6 @@ public abstract class LettuceConverters extends Converters { } /** - * @param source - * @return * @since 1.6 */ public static byte[] toBytes(Double source) { @@ -489,8 +505,6 @@ public abstract class LettuceConverters extends Converters { } /** - * @param source - * @return * @since 1.7 */ public static RedisClusterNode toRedisClusterNode(io.lettuce.core.cluster.models.partitions.RedisClusterNode source) { @@ -507,18 +521,22 @@ public abstract class LettuceConverters extends Converters { private static Set parseFlags(@Nullable Set source) { Set flags = new LinkedHashSet<>(source != null ? source.size() : 8, 1); + for (NodeFlag flag : source) { - switch (flag) { - case NOFLAGS -> flags.add(Flag.NOFLAGS); - case EVENTUAL_FAIL -> flags.add(Flag.PFAIL); - case FAIL -> flags.add(Flag.FAIL); - case HANDSHAKE -> flags.add(Flag.HANDSHAKE); - case MASTER -> flags.add(Flag.MASTER); - case MYSELF -> flags.add(Flag.MYSELF); - case NOADDR -> flags.add(Flag.NOADDR); - case SLAVE, REPLICA -> flags.add(Flag.REPLICA); + if (Objects.nonNull(flag)) { + switch (flag) { + case NOFLAGS -> flags.add(Flag.NOFLAGS); + case EVENTUAL_FAIL -> flags.add(Flag.PFAIL); + case FAIL -> flags.add(Flag.FAIL); + case HANDSHAKE -> flags.add(Flag.HANDSHAKE); + case MASTER -> flags.add(Flag.MASTER); + case MYSELF -> flags.add(Flag.MYSELF); + case NOADDR -> flags.add(Flag.NOADDR); + case SLAVE, REPLICA -> flags.add(Flag.REPLICA); + } } } + return flags; } @@ -559,12 +577,12 @@ public abstract class LettuceConverters extends Converters { } if (option != null) { - switch (option) { case SET_IF_ABSENT -> args.nx(); case SET_IF_PRESENT -> args.xx(); } } + return args; } @@ -572,7 +590,6 @@ public abstract class LettuceConverters extends Converters { * Convert {@link Expiration} to {@link GetExArgs}. * * @param expiration can be {@literal null}. - * @return * @since 2.6 */ static GetExArgs toGetExArgs(@Nullable Expiration expiration) { @@ -613,22 +630,19 @@ public abstract class LettuceConverters extends Converters { /** * Convert {@link Metric} into {@link GeoArgs.Unit}. * - * @param metric - * @return * @since 1.8 */ public static GeoArgs.Unit toGeoArgsUnit(Metric metric) { - Metric metricToUse = metric == null || ObjectUtils.nullSafeEquals(Metrics.NEUTRAL, metric) ? DistanceUnit.METERS - : metric; + Metric metricToUse = metric == null + || ObjectUtils.nullSafeEquals(Metrics.NEUTRAL, metric) ? DistanceUnit.METERS : metric; + return ObjectUtils.caseInsensitiveValueOf(GeoArgs.Unit.values(), metricToUse.getAbbreviation()); } /** * Convert {@link GeoRadiusCommandArgs} into {@link GeoArgs}. * - * @param args - * @return * @since 1.8 */ public static GeoArgs toGeoArgs(GeoRadiusCommandArgs args) { @@ -638,8 +652,6 @@ public abstract class LettuceConverters extends Converters { /** * Convert {@link GeoCommandArgs} into {@link GeoArgs}. * - * @param args - * @return * @since 2.6 */ public static GeoArgs toGeoArgs(GeoCommandArgs args) { @@ -673,8 +685,6 @@ public abstract class LettuceConverters extends Converters { /** * Convert {@link BitFieldSubCommands} into {@link BitFieldArgs}. * - * @param subCommands - * @return * @since 2.1 */ public static BitFieldArgs toBitFieldArgs(BitFieldSubCommands subCommands) { @@ -683,36 +693,34 @@ public abstract class LettuceConverters extends Converters { for (BitFieldSubCommand subCommand : subCommands) { - BitFieldArgs.BitFieldType bft = subCommand.getType().isSigned() + BitFieldArgs.BitFieldType bitFieldType = subCommand.getType().isSigned() ? BitFieldArgs.signed(subCommand.getType().getBits()) : BitFieldArgs.unsigned(subCommand.getType().getBits()); - BitFieldArgs.Offset offset; - if (subCommand.getOffset().isZeroBased()) { - offset = BitFieldArgs.offset((int) subCommand.getOffset().getValue()); - } else { - offset = BitFieldArgs.typeWidthBasedOffset((int) subCommand.getOffset().getValue()); - } + BitFieldArgs.Offset offset = subCommand.getOffset().isZeroBased() + ? BitFieldArgs.offset((int) subCommand.getOffset().getValue()) + : BitFieldArgs.typeWidthBasedOffset((int) subCommand.getOffset().getValue()); if (subCommand instanceof BitFieldGet) { - args = args.get(bft, offset); + args = args.get(bitFieldType, offset); } else if (subCommand instanceof BitFieldSet) { - args = args.set(bft, offset, ((BitFieldSet) subCommand).getValue()); + args = args.set(bitFieldType, offset, ((BitFieldSet) subCommand).getValue()); } else if (subCommand instanceof BitFieldIncrBy) { BitFieldIncrBy.Overflow overflow = ((BitFieldIncrBy) subCommand).getOverflow(); + if (overflow != null) { BitFieldArgs.OverflowType type = switch (overflow) { case SAT -> BitFieldArgs.OverflowType.SAT; case FAIL -> BitFieldArgs.OverflowType.FAIL; case WRAP -> BitFieldArgs.OverflowType.WRAP; - }; + }; args = args.overflow(type); } - args = args.incrBy(bft, (int) subCommand.getOffset().getValue(), ((BitFieldIncrBy) subCommand).getValue()); + args = args.incrBy(bitFieldType, (int) subCommand.getOffset().getValue(), ((BitFieldIncrBy) subCommand).getValue()); } } @@ -736,6 +744,7 @@ public abstract class LettuceConverters extends Converters { KeyScanArgs scanArgs = new KeyScanArgs(); byte[] pattern = options.getBytePattern(); + if (pattern != null) { scanArgs.match(pattern); } @@ -754,7 +763,6 @@ public abstract class LettuceConverters extends Converters { /** * Get {@link Converter} capable of {@link Set} of {@link Byte} into {@link GeoResults}. * - * @return * @since 1.8 */ public static Converter, GeoResults>> bytesSetToGeoResultsConverter() { @@ -767,9 +775,11 @@ public abstract class LettuceConverters extends Converters { List>> results = new ArrayList<>(source.size()); Iterator it = source.iterator(); + while (it.hasNext()) { results.add(new GeoResult<>(new GeoLocation<>(it.next(), null), new Distance(0D))); } + return new GeoResults<>(results); }; } @@ -777,8 +787,6 @@ public abstract class LettuceConverters extends Converters { /** * Get {@link Converter} capable of convering {@link GeoWithin} into {@link GeoResults}. * - * @param metric - * @return * @since 1.8 */ public static Converter>, GeoResults>> geoRadiusResponseToGeoResultsConverter( @@ -793,9 +801,6 @@ public abstract class LettuceConverters extends Converters { /** * Return {@link Optional} lower bound from {@link Range}. * - * @param range - * @param - * @return * @since 2.0.9 */ static > Optional getLowerBound(org.springframework.data.domain.Range range) { @@ -805,9 +810,6 @@ public abstract class LettuceConverters extends Converters { /** * Return {@link Optional} upper bound from {@link Range}. * - * @param range - * @param - * @return * @since 2.0.9 */ static > Optional getUpperBound(org.springframework.data.domain.Range range) { @@ -818,7 +820,6 @@ public abstract class LettuceConverters extends Converters { * Return the lower bound index from {@link Range} or {@literal 0} (zero) if the lower range is not bounded to point * to the first element. To be used with index-based commands such as {@code LRANGE}, {@code GETRANGE}. * - * @param range * @return the lower index bound value or {@literal 0} for the first element if not bounded. * @since 2.0.9 */ @@ -830,7 +831,6 @@ public abstract class LettuceConverters extends Converters { * Return the upper bound index from {@link Range} or {@literal -1} (minus one) if the upper range is not bounded to * point to the last element. To be used with index-based commands such as {@code LRANGE}, {@code GETRANGE}. * - * @param range * @return the upper index bound value or {@literal -1} for the last element if not bounded. * @since 2.0.9 */ @@ -866,6 +866,7 @@ public abstract class LettuceConverters extends Converters { BoxShape boxPredicate = (BoxShape) predicate; BoundingBox boundingBox = boxPredicate.getBoundingBox(); + return GeoSearch.byBox(boundingBox.getWidth().getValue(), boundingBox.getHeight().getValue(), toGeoArgsUnit(boxPredicate.getMetric())); } @@ -882,6 +883,7 @@ public abstract class LettuceConverters extends Converters { if (reference instanceof GeoReference.GeoCoordinateReference) { GeoCoordinateReference coordinates = (GeoCoordinateReference) reference; + return GeoSearch.fromCoordinates(coordinates.getLongitude(), coordinates.getLatitude()); } @@ -897,26 +899,26 @@ public abstract class LettuceConverters extends Converters { return switch (option) { case ASYNC -> FlushMode.ASYNC; case SYNC -> FlushMode.SYNC; - }; + }; } /** * @author Christoph Strobl * @since 1.8 */ - static enum GeoResultsConverterFactory { + enum GeoResultsConverterFactory { INSTANCE; Converter>, GeoResults>> forMetric(Metric metric) { - return new GeoResultsConverter( - metric == null || ObjectUtils.nullSafeEquals(Metrics.NEUTRAL, metric) ? DistanceUnit.METERS : metric); + return new GeoResultsConverter(metric == null + || ObjectUtils.nullSafeEquals(Metrics.NEUTRAL, metric) ? DistanceUnit.METERS : metric); } private static class GeoResultsConverter implements Converter>, GeoResults>> { - private Metric metric; + private final Metric metric; public GeoResultsConverter(Metric metric) { this.metric = metric; @@ -942,7 +944,7 @@ public abstract class LettuceConverters extends Converters { * @author Christoph Strobl * @since 1.8 */ - static enum GeoResultConverterFactory { + enum GeoResultConverterFactory { INSTANCE; @@ -952,7 +954,7 @@ public abstract class LettuceConverters extends Converters { private static class GeoResultConverter implements Converter, GeoResult>> { - private Metric metric; + private final Metric metric; public GeoResultConverter(Metric metric) { this.metric = metric; diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisClusterConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisClusterConnection.java index aa8c9ce01..4a3bb81d0 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisClusterConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisClusterConnection.java @@ -36,7 +36,6 @@ import java.util.Random; import java.util.Set; import java.util.stream.Collectors; -import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.ClusterTopologyProvider; import org.springframework.data.redis.connection.ReactiveRedisClusterConnection; @@ -83,7 +82,6 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti * @throws IllegalArgumentException when {@code client} is {@literal null}. * @since 2.0.1 */ - @SuppressWarnings("unchecked") LettuceReactiveRedisClusterConnection(StatefulConnection sharedConnection, LettuceConnectionProvider connectionProvider, RedisClusterClient client) { @@ -278,18 +276,17 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti Assert.notNull(node, "Node must not be null"); Assert.notNull(mode, "AddSlots mode must not be null"); - return execute(node, cmd -> { + return execute(node, commands -> { RedisClusterNode nodeToUse = lookup(node); String nodeId = nodeToUse.getId(); return switch (mode) { - case MIGRATING -> cmd.clusterSetSlotMigrating(slot, nodeId); - case IMPORTING -> cmd.clusterSetSlotImporting(slot, nodeId); - case NODE -> cmd.clusterSetSlotNode(slot, nodeId); - case STABLE -> cmd.clusterSetSlotStable(slot); - }; - + case MIGRATING -> commands.clusterSetSlotMigrating(slot, nodeId); + case IMPORTING -> commands.clusterSetSlotImporting(slot, nodeId); + case NODE -> commands.clusterSetSlotNode(slot, nodeId); + case STABLE -> commands.clusterSetSlotStable(slot); + }; }).then(); } @@ -349,7 +346,7 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti return getConnection().map(StatefulRedisClusterConnection::reactive); } - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({ "unchecked" }) protected Mono> getCommands(RedisNode node) { if (StringUtils.hasText(node.getId())) { diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStringCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStringCommands.java index 180f5e63c..77021a0d0 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStringCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStringCommands.java @@ -17,13 +17,11 @@ package org.springframework.data.redis.connection.lettuce; import io.lettuce.core.BitFieldArgs; import io.lettuce.core.GetExArgs; -import io.lettuce.core.KeyValue; import io.lettuce.core.SetArgs; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -38,13 +36,18 @@ import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiVa import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse; import org.springframework.data.redis.connection.ReactiveRedisConnection.RangeCommand; import org.springframework.data.redis.connection.ReactiveStringCommands; +import org.springframework.data.redis.connection.RedisStringCommands; +import org.springframework.data.redis.core.types.Expiration; import org.springframework.util.Assert; /** + * {@link ReactiveStringCommands} implemented using {@literal Lettuce}. + * * @author Christoph Strobl * @author Mark Paluch * @author Jiahe Cai * @author Michele Mancioppi + * @author John Blum * @since 2.0 */ class LettuceReactiveStringCommands implements ReactiveStringCommands { @@ -66,20 +69,20 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands { @Override public Flux, ByteBuffer>> mGet(Publisher> keyCollections) { - return connection.execute(cmd -> Flux.from(keyCollections).concatMap((keys) -> { + return this.connection.execute(reactiveCommands -> Flux.from(keyCollections).concatMap((keys) -> { Assert.notNull(keys, "Keys must not be null"); - return cmd.mget(keys.toArray(new ByteBuffer[0])).collectList().map((value) -> { - return value.stream().map(keyValue -> keyValue.getValueOrElse(null)).collect(Collectors.toList()); - }).map((values) -> new MultiValueResponse<>(keys, values)); + return reactiveCommands.mget(keys.toArray(new ByteBuffer[0])).collectList() + .map(value -> value.stream().map(keyValue -> keyValue.getValueOrElse(null)).collect(Collectors.toList())) + .map(values -> new MultiValueResponse<>(keys, values)); })); } @Override public Flux> set(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap((command) -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap((command) -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getValue(), "Value must not be null"); @@ -87,12 +90,16 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands { SetArgs args = null; if (command.getExpiration().isPresent() || command.getOption().isPresent()) { - args = LettuceConverters.toSetArgs(command.getExpiration().isPresent() ? command.getExpiration().get() : null, - command.getOption().isPresent() ? command.getOption().get() : null); + + Expiration expiration = command.getExpiration().orElse(null); + RedisStringCommands.SetOption setOption = command.getOption().orElse(null); + + args = LettuceConverters.toSetArgs(expiration, setOption); } - Mono mono = args != null ? cmd.set(command.getKey(), command.getValue(), args) - : cmd.set(command.getKey(), command.getValue()); + Mono mono = args != null ? reactiveCommands.set(command.getKey(), command.getValue(), args) + : reactiveCommands.set(command.getKey(), command.getValue()); + return mono.map(LettuceConverters::stringToBoolean).map(value -> new BooleanResponse<>(command, value)) .switchIfEmpty(Mono.just(new BooleanResponse<>(command, Boolean.FALSE))); })); @@ -101,16 +108,17 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands { @Override public Flux> getSet(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap((command) -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap((command) -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getValue(), "Value must not be null"); if (command.getExpiration().isPresent() || command.getOption().isPresent()) { - throw new IllegalArgumentException("Command must not define expiration nor option for GETSET."); + throw new IllegalArgumentException("Command must not define expiration nor option for GETSET"); } - return cmd.getset(command.getKey(), command.getValue()).map((value) -> new ByteBufferResponse<>(command, value)) + return reactiveCommands.getset(command.getKey(), command.getValue()) + .map(value -> new ByteBufferResponse<>(command, value)) .defaultIfEmpty(new AbsentByteBufferResponse<>(command)); })); } @@ -118,11 +126,12 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands { @Override public Flux> get(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap((command) -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap((command) -> { Assert.notNull(command.getKey(), "Key must not be null"); - return cmd.get(command.getKey()).map((value) -> new ByteBufferResponse<>(command, value)) + return reactiveCommands.get(command.getKey()) + .map(value -> new ByteBufferResponse<>(command, value)) .defaultIfEmpty(new AbsentByteBufferResponse<>(command)); })); } @@ -130,11 +139,12 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands { @Override public Flux> getDel(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap((command) -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap((command) -> { Assert.notNull(command.getKey(), "Key must not be null"); - return cmd.getdel(command.getKey()).map((value) -> new ByteBufferResponse<>(command, value)) + return reactiveCommands.getdel(command.getKey()) + .map(value -> new ByteBufferResponse<>(command, value)) .defaultIfEmpty(new AbsentByteBufferResponse<>(command)); })); } @@ -142,13 +152,14 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands { @Override public Flux> getEx(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap((command) -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap((command) -> { Assert.notNull(command.getKey(), "Key must not be null"); GetExArgs args = LettuceConverters.toGetExArgs(command.getExpiration()); - return cmd.getex(command.getKey(), args).map((value) -> new ByteBufferResponse<>(command, value)) + return reactiveCommands.getex(command.getKey(), args) + .map(value -> new ByteBufferResponse<>(command, value)) .defaultIfEmpty(new AbsentByteBufferResponse<>(command)); })); } @@ -156,51 +167,59 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands { @Override public Flux> setNX(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getValue(), "Value must not be null"); - return cmd.setnx(command.getKey(), command.getValue()).map((value) -> new BooleanResponse<>(command, value)); + return reactiveCommands.setnx(command.getKey(), command.getValue()) + .map((value) -> new BooleanResponse<>(command, value)); })); } @Override public Flux> setEX(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getValue(), "Value must not be null"); Assert.isTrue(command.getExpiration().isPresent(), "Expiration time must not be null"); - return cmd.setex(command.getKey(), command.getExpiration().get().getExpirationTimeInSeconds(), command.getValue()) - .map(LettuceConverters::stringToBoolean).map((value) -> new BooleanResponse<>(command, value)); + long expirationTimeInSeconds = command.getExpiration().get().getExpirationTimeInSeconds(); + + return reactiveCommands.setex(command.getKey(), expirationTimeInSeconds, command.getValue()) + .map(LettuceConverters::stringToBoolean) + .map((value) -> new BooleanResponse<>(command, value)); })); } @Override public Flux> pSetEX(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getValue(), "Value must not be null"); Assert.isTrue(command.getExpiration().isPresent(), "Expiration time must not be null"); - return cmd - .psetex(command.getKey(), command.getExpiration().get().getExpirationTimeInMilliseconds(), command.getValue()) - .map(LettuceConverters::stringToBoolean).map((value) -> new BooleanResponse<>(command, value)); + long expirationTimeInSeconds = command.getExpiration().get().getExpirationTimeInMilliseconds(); + + return reactiveCommands.psetex(command.getKey(), expirationTimeInSeconds, command.getValue()) + .map(LettuceConverters::stringToBoolean) + .map((value) -> new BooleanResponse<>(command, value)); })); } @Override public Flux> mSet(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notEmpty(command.getKeyValuePairs(), "Pairs must not be null or empty"); - return cmd.mset(command.getKeyValuePairs()).map(LettuceConverters::stringToBoolean) + return reactiveCommands.mset(command.getKeyValuePairs()) + .map(LettuceConverters::stringToBoolean) .map((value) -> new BooleanResponse<>(command, value)); })); } @@ -208,54 +227,56 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands { @Override public Flux> mSetNX(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notEmpty(command.getKeyValuePairs(), "Pairs must not be null or empty"); - return cmd.msetnx(command.getKeyValuePairs()).map((value) -> new BooleanResponse<>(command, value)); + return reactiveCommands.msetnx(command.getKeyValuePairs()) + .map((value) -> new BooleanResponse<>(command, value)); })); } @Override public Flux> append(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getValue(), "Value must not be null"); - return cmd.append(command.getKey(), command.getValue()).map((value) -> new NumericResponse<>(command, value)); + return reactiveCommands.append(command.getKey(), command.getValue()) + .map(value -> new NumericResponse<>(command, value)); })); } @Override public Flux> getRange(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getRange(), "Range must not be null"); Range range = command.getRange(); - Mono result = cmd.getrange(command.getKey(), // + Mono result = reactiveCommands.getrange(command.getKey(), // LettuceConverters.getLowerBoundIndex(range), // LettuceConverters.getUpperBoundIndex(range)); - return result.map((value) -> new ByteBufferResponse<>(command, value)); + return result.map(value -> new ByteBufferResponse<>(command, value)); })); } @Override public Flux> setRange(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getValue(), "Value must not be null"); Assert.notNull(command.getOffset(), "Offset must not be null"); - return cmd.setrange(command.getKey(), command.getOffset(), command.getValue()) + return reactiveCommands.setrange(command.getKey(), command.getOffset(), command.getValue()) .map((value) -> new NumericResponse<>(command, value)); })); } @@ -263,12 +284,13 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands { @Override public Flux> getBit(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getOffset(), "Offset must not be null"); - return cmd.getbit(command.getKey(), command.getOffset()).map(LettuceConverters::toBoolean) + return reactiveCommands.getbit(command.getKey(), command.getOffset()) + .map(LettuceConverters::toBoolean) .map(value -> new BooleanResponse<>(command, value)); })); } @@ -276,13 +298,12 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands { @Override public Flux> setBit(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); - Assert.notNull(command.getValue(), "Value must not be null"); Assert.notNull(command.getOffset(), "Offset must not be null"); - return cmd.setbit(command.getKey(), command.getOffset(), command.getValue() ? 1 : 0) + return reactiveCommands.setbit(command.getKey(), command.getOffset(), command.getValue() ? 1 : 0) .map(LettuceConverters::toBoolean).map(respValue -> new BooleanResponse<>(command, respValue)); })); } @@ -290,54 +311,60 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands { @Override public Flux> bitCount(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Range range = command.getRange(); - return (!Range.unbounded().equals(range) - ? cmd.bitcount(command.getKey(), LettuceConverters.getLowerBoundIndex(range), // - LettuceConverters.getUpperBoundIndex(range)) // - : cmd.bitcount(command.getKey())).map(responseValue -> new NumericResponse<>(command, responseValue)); + Mono bitcount = !Range.unbounded().equals(range) + ? reactiveCommands.bitcount(command.getKey(), LettuceConverters.getLowerBoundIndex(range), + LettuceConverters.getUpperBoundIndex(range)) + : reactiveCommands.bitcount(command.getKey()); + + return bitcount.map(responseValue -> new NumericResponse<>(command, responseValue)); })); } @Override public Flux> bitField(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); BitFieldArgs args = LettuceConverters.toBitFieldArgs(command.getSubCommands()); - return cmd.bitfield(command.getKey(), args).collectList().map(value -> new MultiValueResponse<>(command, - value.stream().map(v -> v.getValueOrElse(null)).collect(Collectors.toList()))); + return reactiveCommands.bitfield(command.getKey(), args).collectList() + .map(value -> new MultiValueResponse<>(command, value.stream() + .map(v -> v.getValueOrElse(null)) + .collect(Collectors.toList()))); })); } @Override public Flux> bitOp(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getDestinationKey(), "DestinationKey must not be null"); Assert.notEmpty(command.getKeys(), "Keys must not be null or empty"); - Mono result = null; ByteBuffer destinationKey = command.getDestinationKey(); + ByteBuffer[] sourceKeys = command.getKeys().toArray(new ByteBuffer[0]); + Mono result; + result = switch (command.getBitOp()) { - case AND -> cmd.bitopAnd(destinationKey, sourceKeys); - case OR -> cmd.bitopOr(destinationKey, sourceKeys); - case XOR -> cmd.bitopXor(destinationKey, sourceKeys); + case AND -> reactiveCommands.bitopAnd(destinationKey, sourceKeys); + case OR -> reactiveCommands.bitopOr(destinationKey, sourceKeys); + case XOR -> reactiveCommands.bitopXor(destinationKey, sourceKeys); case NOT -> { Assert.isTrue(sourceKeys.length == 1, "BITOP NOT does not allow more than 1 source key."); - yield cmd.bitopNot(destinationKey, sourceKeys[0]); + yield reactiveCommands.bitopNot(destinationKey, sourceKeys[0]); } - default -> throw new IllegalArgumentException(String.format("Unknown BITOP '%s'.", command.getBitOp())); + default -> throw new IllegalArgumentException(String.format("Unknown BITOP '%s'", command.getBitOp())); }; return result.map(value -> new NumericResponse<>(command, value)); @@ -347,42 +374,36 @@ class LettuceReactiveStringCommands implements ReactiveStringCommands { @Override public Flux> bitPos(Publisher commands) { - return connection.execute(cmd -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).flatMap(command -> { - return Flux.from(commands).flatMap(command -> { - - Mono result; Range range = command.getRange(); + Mono result; if (range.getLowerBound().isBounded()) { - result = cmd.bitpos(command.getKey(), command.getBit(), getLowerValue(range)); + result = reactiveCommands.bitpos(command.getKey(), command.getBit(), getLowerValue(range)); if (range.getUpperBound().isBounded()) { - result = cmd.bitpos(command.getKey(), command.getBit(), getLowerValue(range), getUpperValue(range)); + result = reactiveCommands.bitpos(command.getKey(), command.getBit(), + getLowerValue(range), getUpperValue(range)); } } else { - result = cmd.bitpos(command.getKey(), command.getBit()); + result = reactiveCommands.bitpos(command.getKey(), command.getBit()); } return result.map(respValue -> new NumericResponse<>(command, respValue)); - }); - }); + })); } @Override public Flux> strLen(Publisher commands) { - return connection.execute(cmd -> { - - return Flux.from(commands).concatMap(command -> { - return cmd.strlen(command.getKey()).map(respValue -> new NumericResponse<>(command, respValue)); - }); - }); + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> + reactiveCommands.strlen(command.getKey()).map(respValue -> new NumericResponse<>(command, respValue)))); } protected LettuceReactiveRedisConnection getConnection() { - return connection; + return this.connection; } private static > T getUpperValue(Range range) { diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommands.java index 0340b2a76..62ce81dbe 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommands.java @@ -15,6 +15,7 @@ */ package org.springframework.data.redis.connection.lettuce; +import io.lettuce.core.KeyValue; import io.lettuce.core.Limit; import io.lettuce.core.Range; import io.lettuce.core.ScanStream; @@ -22,6 +23,7 @@ import io.lettuce.core.ScoredValue; import io.lettuce.core.Value; import io.lettuce.core.ZAddArgs; import io.lettuce.core.ZStoreArgs; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -48,10 +50,13 @@ import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; /** + * {@link ReactiveZSetCommands} implementation for {@literal Lettuce}. + * * @author Christoph Strobl * @author Mark Paluch * @author Michele Mancioppi * @author Andrey Shlykov + * @author John Blum * @since 2.0 */ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { @@ -74,7 +79,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { @SuppressWarnings("unchecked") public Flux> zAdd(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notEmpty(command.getTuples(), "Tuples must not be empty or null"); @@ -91,7 +96,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { Tuple tuple = command.getTuples().iterator().next(); - return cmd.zaddincr(command.getKey(), tuple.getScore(), ByteBuffer.wrap(tuple.getValue())) + return reactiveCommands.zaddincr(command.getKey(), tuple.getScore(), ByteBuffer.wrap(tuple.getValue())) .map(value -> new NumericResponse<>(command, value)); } @@ -117,7 +122,8 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { .map(tuple -> ScoredValue.fromNullable(tuple.getScore(), ByteBuffer.wrap(tuple.getValue()))) .toArray(ScoredValue[]::new); - Mono result = args == null ? cmd.zadd(command.getKey(), values) : cmd.zadd(command.getKey(), args, values); + Mono result = args == null ? reactiveCommands.zadd(command.getKey(), values) + : reactiveCommands.zadd(command.getKey(), args, values); return result.map(value -> new NumericResponse<>(command, value)); })); @@ -126,12 +132,14 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { @Override public Flux> zRem(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notEmpty(command.getValues(), "Values must not be null or empty"); - return cmd.zrem(command.getKey(), command.getValues().stream().toArray(ByteBuffer[]::new)) + ByteBuffer[] values = command.getValues().toArray(ByteBuffer[]::new); + + return reactiveCommands.zrem(command.getKey(), values) .map(value -> new NumericResponse<>(command, value)); })); } @@ -139,13 +147,13 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { @Override public Flux> zIncrBy(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getValue(), "Member must not be null"); Assert.notNull(command.getIncrement(), "Increment value must not be null"); - return cmd.zincrby(command.getKey(), command.getIncrement().doubleValue(), command.getValue()) + return reactiveCommands.zincrby(command.getKey(), command.getIncrement().doubleValue(), command.getValue()) .map(value -> new NumericResponse<>(command, value)); })); } @@ -154,11 +162,11 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { public Flux>> zRandMember( Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).map(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> { Assert.notNull(command.getKey(), "Key must not be null"); - return new CommandResponse<>(command, cmd.zrandmember(command.getKey(), command.getCount())); + return new CommandResponse<>(command, reactiveCommands.zrandmember(command.getKey(), command.getCount())); })); } @@ -166,26 +174,28 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { public Flux>> zRandMemberWithScore( Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).map(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> { Assert.notNull(command.getKey(), "Key must not be null"); - return new CommandResponse<>(command, - cmd.zrandmemberWithScores(command.getKey(), command.getCount()).map(this::toTuple)); + Flux> result = + reactiveCommands.zrandmemberWithScores(command.getKey(), command.getCount()); + + return new CommandResponse<>(command, result.map(this::toTuple)); })); } @Override public Flux> zRank(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getValue(), "Value must not be null"); Mono result = ObjectUtils.nullSafeEquals(command.getDirection(), Direction.ASC) - ? cmd.zrank(command.getKey(), command.getValue()) - : cmd.zrevrank(command.getKey(), command.getValue()); + ? reactiveCommands.zrank(command.getKey(), command.getValue()) + : reactiveCommands.zrevrank(command.getKey(), command.getValue()); return result.map(value -> new NumericResponse<>(command, value)); })); @@ -194,7 +204,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { @Override public Flux>> zRange(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getRange(), "Range must not be null"); @@ -206,19 +216,15 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { if (ObjectUtils.nullSafeEquals(command.getDirection(), Direction.ASC)) { if (command.isWithScores()) { - - result = cmd.zrangeWithScores(command.getKey(), start, stop).map(this::toTuple); + result = reactiveCommands.zrangeWithScores(command.getKey(), start, stop).map(this::toTuple); } else { - - result = cmd.zrange(command.getKey(), start, stop).map(value -> toTuple(value, Double.NaN)); + result = reactiveCommands.zrange(command.getKey(), start, stop).map(value -> toTuple(value, Double.NaN)); } } else { if (command.isWithScores()) { - - result = cmd.zrevrangeWithScores(command.getKey(), start, stop).map(this::toTuple); + result = reactiveCommands.zrevrangeWithScores(command.getKey(), start, stop).map(this::toTuple); } else { - - result = cmd.zrevrange(command.getKey(), start, stop).map(value -> toTuple(value, Double.NaN)); + result = reactiveCommands.zrevrange(command.getKey(), start, stop).map(value -> toTuple(value, Double.NaN)); } } @@ -230,7 +236,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { @SuppressWarnings("unchecked") public Flux>> zRangeStore(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Source key must not be null"); Assert.notNull(command.getDestKey(), "Destination key must not be null"); @@ -243,17 +249,17 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { if (command.getDirection() == Direction.ASC) { switch (command.getRangeMode()) { - case ByScore -> result = cmd.zrangestorebyscore(command.getDestKey(), command.getKey(), + case ByScore -> result = reactiveCommands.zrangestorebyscore(command.getDestKey(), command.getKey(), (Range) LettuceConverters.toRange(command.getRange()), limit); - case ByLex -> result = cmd.zrangestorebylex(command.getDestKey(), command.getKey(), + case ByLex -> result = reactiveCommands.zrangestorebylex(command.getDestKey(), command.getKey(), RangeConverter.toRange(command.getRange()), limit); default -> throw new IllegalStateException("Unsupported value: " + command.getRangeMode()); } } else { switch (command.getRangeMode()) { - case ByScore -> result = cmd.zrevrangestorebyscore(command.getDestKey(), command.getKey(), + case ByScore -> result = reactiveCommands.zrevrangestorebyscore(command.getDestKey(), command.getKey(), (Range) LettuceConverters.toRange(command.getRange()), limit); - case ByLex -> result = cmd.zrevrangestorebylex(command.getDestKey(), command.getKey(), + case ByLex -> result = reactiveCommands.zrevrangestorebylex(command.getDestKey(), command.getKey(), RangeConverter.toRange(command.getRange()), limit); default -> throw new IllegalStateException("Unsupported value: " + command.getRangeMode()); } @@ -267,7 +273,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { public Flux>> zRangeByScore( Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getRange(), "Range must not be null"); @@ -281,21 +287,19 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { Range range = RangeConverter.toRange(command.getRange()); if (command.isWithScores()) { - if (!isLimited) { - result = cmd.zrangebyscoreWithScores(command.getKey(), range).map(this::toTuple); + result = reactiveCommands.zrangebyscoreWithScores(command.getKey(), range).map(this::toTuple); } else { - result = cmd + result = reactiveCommands .zrangebyscoreWithScores(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get())) .map(this::toTuple); } } else { - if (!isLimited) { - result = cmd.zrangebyscore(command.getKey(), range).map(value -> toTuple(value, Double.NaN)); + result = reactiveCommands.zrangebyscore(command.getKey(), range).map(value -> toTuple(value, Double.NaN)); } else { - result = cmd.zrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get())) + result = reactiveCommands.zrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get())) .map(value -> toTuple(value, Double.NaN)); } } @@ -304,21 +308,18 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { Range range = RangeConverter.toRange(command.getRange()); if (command.isWithScores()) { - if (!isLimited) { - result = cmd.zrevrangebyscoreWithScores(command.getKey(), range).map(this::toTuple); + result = reactiveCommands.zrevrangebyscoreWithScores(command.getKey(), range).map(this::toTuple); } else { - - result = cmd.zrevrangebyscoreWithScores(command.getKey(), range, + result = reactiveCommands.zrevrangebyscoreWithScores(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get())).map(this::toTuple); } } else { - if (!isLimited) { - result = cmd.zrevrangebyscore(command.getKey(), range).map(value -> toTuple(value, Double.NaN)); + result = reactiveCommands.zrevrangebyscore(command.getKey(), range) + .map(value -> toTuple(value, Double.NaN)); } else { - - result = cmd.zrevrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get())) + result = reactiveCommands.zrevrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get())) .map(value -> toTuple(value, Double.NaN)); } } @@ -331,12 +332,12 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { @Override public Flux>> zScan(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getOptions(), "ScanOptions must not be null"); - Flux result = ScanStream.zscan(cmd, command.getKey(), LettuceConverters.toScanArgs(command.getOptions())) + Flux result = ScanStream.zscan(reactiveCommands, command.getKey(), LettuceConverters.toScanArgs(command.getOptions())) .map(this::toTuple); return Mono.just(new CommandResponse<>(command, result)); @@ -346,13 +347,13 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { @Override public Flux> zCount(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getRange(), "Range must not be null"); Range range = RangeConverter.toRange(command.getRange()); - Mono result = cmd.zcount(command.getKey(), range); + Mono result = reactiveCommands.zcount(command.getKey(), range); return result.map(value -> new NumericResponse<>(command, value)); })); @@ -361,12 +362,12 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { @Override public Flux> zLexCount(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getRange(), "Range must not be null"); - Mono result = cmd.zlexcount(command.getKey(), RangeConverter.toRange(command.getRange())); + Mono result = reactiveCommands.zlexcount(command.getKey(), RangeConverter.toRange(command.getRange())); return result.map(value -> new NumericResponse<>(command, value)); })); @@ -375,17 +376,20 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { @Override public Flux>> zPop(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).map(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Flux> result; + if (command.getCount() > 1) { - result = command.getDirection() == PopDirection.MIN ? cmd.zpopmin(command.getKey(), command.getCount()) - : cmd.zpopmax(command.getKey(), command.getCount()); + result = command.getDirection() == PopDirection.MIN + ? reactiveCommands.zpopmin(command.getKey(), command.getCount()) + : reactiveCommands.zpopmax(command.getKey(), command.getCount()); } else { - result = (command.getDirection() == PopDirection.MIN ? cmd.zpopmin(command.getKey()) - : cmd.zpopmax(command.getKey())).flux(); + result = (command.getDirection() == PopDirection.MIN + ? reactiveCommands.zpopmin(command.getKey()) + : reactiveCommands.zpopmax(command.getKey())).flux(); } return new CommandResponse<>(command, result.filter(Value::hasValue).map(this::toTuple)); @@ -395,7 +399,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { @Override public Flux>> bZPop(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).map(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getTimeout(), "Timeout must not be null"); @@ -404,55 +408,63 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { double timeout = TimeoutUtils.toDoubleSeconds(command.getTimeout(), command.getTimeUnit()); - Mono> result = (command.getDirection() == PopDirection.MIN - ? cmd.bzpopmin(timeout, command.getKey()) - : cmd.bzpopmax(timeout, command.getKey())).filter(Value::hasValue).map(Value::getValue); + Mono>> commandResult = command.getDirection() == PopDirection.MIN + ? reactiveCommands.bzpopmin(timeout, command.getKey()) + : reactiveCommands.bzpopmax(timeout, command.getKey()); + + Mono> result = commandResult.filter(Value::hasValue).map(Value::getValue); return new CommandResponse<>(command, result.filter(Value::hasValue).map(this::toTuple).flux()); } + else { - long timeout = command.getTimeUnit().toSeconds(command.getTimeout()); + long timeout = command.getTimeUnit().toSeconds(command.getTimeout()); - Mono> result = (command.getDirection() == PopDirection.MIN - ? cmd.bzpopmin(timeout, command.getKey()) - : cmd.bzpopmax(timeout, command.getKey())).filter(Value::hasValue).map(Value::getValue); + Mono>> commandResult = command.getDirection() == PopDirection.MIN + ? reactiveCommands.bzpopmin(timeout, command.getKey()) + : reactiveCommands.bzpopmax(timeout, command.getKey()); - return new CommandResponse<>(command, result.filter(Value::hasValue).map(this::toTuple).flux()); + Mono> result = commandResult.filter(Value::hasValue).map(Value::getValue); + + return new CommandResponse<>(command, result.filter(Value::hasValue).map(this::toTuple).flux()); + } })); } @Override public Flux> zCard(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); - return cmd.zcard(command.getKey()).map(value -> new NumericResponse<>(command, value)); + return reactiveCommands.zcard(command.getKey()) + .map(value -> new NumericResponse<>(command, value)); })); } @Override public Flux> zScore(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getValue(), "Value must not be null"); - return cmd.zscore(command.getKey(), command.getValue()).map(value -> new NumericResponse<>(command, value)); + return reactiveCommands.zscore(command.getKey(), command.getValue()) + .map(value -> new NumericResponse<>(command, value)); })); } @Override public Flux> zMScore(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getValues(), "Values must not be null"); - return cmd.zmscore(command.getKey(), command.getValues().toArray(new ByteBuffer[0])) + return reactiveCommands.zmscore(command.getKey(), command.getValues().toArray(new ByteBuffer[0])) .map(value -> new MultiValueResponse<>(command, value)); })); } @@ -461,12 +473,12 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { public Flux> zRemRangeByRank( Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getRange(), "Range must not be null"); - Mono result = cmd.zremrangebyrank(command.getKey(), // + Mono result = reactiveCommands.zremrangebyrank(command.getKey(), // LettuceConverters.getLowerBoundIndex(command.getRange()), // LettuceConverters.getUpperBoundIndex(command.getRange())); @@ -478,13 +490,13 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { public Flux> zRemRangeByScore( Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getRange(), "Range must not be null"); Range range = RangeConverter.toRange(command.getRange()); - Mono result = cmd.zremrangebyscore(command.getKey(), range); + Mono result = reactiveCommands.zremrangebyscore(command.getKey(), range); return result.map(value -> new NumericResponse<>(command, value)); })); @@ -493,12 +505,12 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { @Override public Flux> zRemRangeByLex(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); Assert.notNull(command.getRange(), "Range must not be null"); - Mono result = cmd.zremrangebylex(command.getKey(), RangeConverter.toRange(command.getRange())); + Mono result = reactiveCommands.zremrangebylex(command.getKey(), RangeConverter.toRange(command.getRange())); return result.map(value -> new NumericResponse<>(command, value)); })); @@ -507,37 +519,41 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { @Override public Flux>> zDiff(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).map(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> { Assert.notEmpty(command.getKeys(), "Keys must not be null or empty"); ByteBuffer[] sourceKeys = command.getKeys().toArray(new ByteBuffer[0]); - return new CommandResponse<>(command, cmd.zdiff(sourceKeys)); + + return new CommandResponse<>(command, reactiveCommands.zdiff(sourceKeys)); })); } @Override public Flux>> zDiffWithScores(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).map(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> { Assert.notEmpty(command.getKeys(), "Keys must not be null or empty"); ByteBuffer[] sourceKeys = command.getKeys().toArray(new ByteBuffer[0]); - return new CommandResponse<>(command, cmd.zdiffWithScores(sourceKeys).map(this::toTuple)); + + return new CommandResponse<>(command, reactiveCommands.zdiffWithScores(sourceKeys).map(this::toTuple)); })); } @Override public Flux> zDiffStore(Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Destination key must not be null"); Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty"); ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(new ByteBuffer[0]); - return cmd.zdiffstore(command.getKey(), sourceKeys).map(value -> new NumericResponse<>(command, value)); + + return reactiveCommands.zdiffstore(command.getKey(), sourceKeys) + .map(value -> new NumericResponse<>(command, value)); })); } @@ -545,18 +561,23 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { public Flux>> zInter( Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).map(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> { Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty"); ZStoreArgs args = null; + if (command.getAggregateFunction().isPresent() || !command.getWeights().isEmpty()) { args = zStoreArgs(command.getAggregateFunction().isPresent() ? command.getAggregateFunction().get() : null, command.getWeights()); } ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(new ByteBuffer[0]); - Flux result = args != null ? cmd.zinter(args, sourceKeys) : cmd.zinter(sourceKeys); + + Flux result = args != null + ? reactiveCommands.zinter(args, sourceKeys) + : reactiveCommands.zinter(sourceKeys); + return new CommandResponse<>(command, result); })); } @@ -565,19 +586,23 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { public Flux>> zInterWithScores( Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).map(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> { Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty"); ZStoreArgs args = null; + if (command.getAggregateFunction().isPresent() || !command.getWeights().isEmpty()) { args = zStoreArgs(command.getAggregateFunction().isPresent() ? command.getAggregateFunction().get() : null, command.getWeights()); } ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(new ByteBuffer[0]); - Flux> result = args != null ? cmd.zinterWithScores(args, sourceKeys) - : cmd.zinterWithScores(sourceKeys); + + Flux> result = args != null + ? reactiveCommands.zinterWithScores(args, sourceKeys) + : reactiveCommands.zinterWithScores(sourceKeys); + return new CommandResponse<>(command, result.map(this::toTuple)); })); } @@ -586,20 +611,24 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { public Flux> zInterStore( Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Destination key must not be null"); Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty"); ZStoreArgs args = null; + if (command.getAggregateFunction().isPresent() || !command.getWeights().isEmpty()) { args = zStoreArgs(command.getAggregateFunction().isPresent() ? command.getAggregateFunction().get() : null, command.getWeights()); } ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(new ByteBuffer[0]); - Mono result = args != null ? cmd.zinterstore(command.getKey(), args, sourceKeys) - : cmd.zinterstore(command.getKey(), sourceKeys); + + Mono result = args != null + ? reactiveCommands.zinterstore(command.getKey(), args, sourceKeys) + : reactiveCommands.zinterstore(command.getKey(), sourceKeys); + return result.map(value -> new NumericResponse<>(command, value)); })); } @@ -608,18 +637,23 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { public Flux>> zUnion( Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).map(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> { Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty"); ZStoreArgs args = null; + if (command.getAggregateFunction().isPresent() || !command.getWeights().isEmpty()) { args = zStoreArgs(command.getAggregateFunction().isPresent() ? command.getAggregateFunction().get() : null, command.getWeights()); } - ByteBuffer[] sourceKeys = command.getSourceKeys().stream().toArray(ByteBuffer[]::new); - Flux result = args != null ? cmd.zunion(args, sourceKeys) : cmd.zunion(sourceKeys); + ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(ByteBuffer[]::new); + + Flux result = args != null + ? reactiveCommands.zunion(args, sourceKeys) + : reactiveCommands.zunion(sourceKeys); + return new CommandResponse<>(command, result); })); } @@ -628,19 +662,23 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { public Flux>> zUnionWithScores( Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).map(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).map(command -> { Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty"); ZStoreArgs args = null; + if (command.getAggregateFunction().isPresent() || !command.getWeights().isEmpty()) { args = zStoreArgs(command.getAggregateFunction().isPresent() ? command.getAggregateFunction().get() : null, command.getWeights()); } - ByteBuffer[] sourceKeys = command.getSourceKeys().stream().toArray(ByteBuffer[]::new); - Flux> result = args != null ? cmd.zunionWithScores(args, sourceKeys) - : cmd.zunionWithScores(sourceKeys); + ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(ByteBuffer[]::new); + + Flux> result = args != null + ? reactiveCommands.zunionWithScores(args, sourceKeys) + : reactiveCommands.zunionWithScores(sourceKeys); + return new CommandResponse<>(command, result.map(this::toTuple)); })); } @@ -649,20 +687,24 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { public Flux> zUnionStore( Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Destination key must not be null"); Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty"); ZStoreArgs args = null; + if (command.getAggregateFunction().isPresent() || !command.getWeights().isEmpty()) { args = zStoreArgs(command.getAggregateFunction().isPresent() ? command.getAggregateFunction().get() : null, command.getWeights()); } - ByteBuffer[] sourceKeys = command.getSourceKeys().stream().toArray(ByteBuffer[]::new); - Mono result = args != null ? cmd.zunionstore(command.getKey(), args, sourceKeys) - : cmd.zunionstore(command.getKey(), sourceKeys); + ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(ByteBuffer[]::new); + + Mono result = args != null + ? reactiveCommands.zunionstore(command.getKey(), args, sourceKeys) + : reactiveCommands.zunionstore(command.getKey(), sourceKeys); + return result.map(value -> new NumericResponse<>(command, value)); })); } @@ -671,26 +713,25 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { public Flux>> zRangeByLex( Publisher commands) { - return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + return this.connection.execute(reactiveCommands -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Destination key must not be null"); Flux result; if (!command.getLimit().isUnlimited()) { - if (ObjectUtils.nullSafeEquals(command.getDirection(), Direction.ASC)) { - result = cmd.zrangebylex(command.getKey(), RangeConverter.toRange(command.getRange()), + result = reactiveCommands.zrangebylex(command.getKey(), RangeConverter.toRange(command.getRange()), LettuceConverters.toLimit(command.getLimit())); } else { - result = cmd.zrevrangebylex(command.getKey(), RangeConverter.toRange(command.getRange()), + result = reactiveCommands.zrevrangebylex(command.getKey(), RangeConverter.toRange(command.getRange()), LettuceConverters.toLimit(command.getLimit())); } } else { if (ObjectUtils.nullSafeEquals(command.getDirection(), Direction.ASC)) { - result = cmd.zrangebylex(command.getKey(), RangeConverter.toRange(command.getRange())); + result = reactiveCommands.zrangebylex(command.getKey(), RangeConverter.toRange(command.getRange())); } else { - result = cmd.zrevrangebylex(command.getKey(), RangeConverter.toRange(command.getRange())); + result = reactiveCommands.zrevrangebylex(command.getKey(), RangeConverter.toRange(command.getRange())); } } @@ -701,6 +742,7 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { private static ZStoreArgs zStoreArgs(@Nullable Aggregate aggregate, @Nullable List weights) { ZStoreArgs args = new ZStoreArgs(); + if (aggregate != null) { switch (aggregate) { case MIN -> args.min(); @@ -725,6 +767,6 @@ class LettuceReactiveZSetCommands implements ReactiveZSetCommands { } protected LettuceReactiveRedisConnection getConnection() { - return connection; + return this.connection; } } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStringCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStringCommands.java index 16f96cd99..f6c552f04 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStringCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStringCommands.java @@ -21,7 +21,6 @@ import io.lettuce.core.api.async.RedisStringAsyncCommands; import java.util.List; import java.util.Map; -import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.BitFieldSubCommands; import org.springframework.data.redis.connection.RedisStringCommands; @@ -31,9 +30,12 @@ import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** + * {@link RedisStringCommands} implementation for {@literal Lettuce}. + * * @author Christoph Strobl * @author Mark Paluch * @author dengliming + * @author John Blum * @since 2.0 */ class LettuceStringCommands implements RedisStringCommands { @@ -279,9 +281,8 @@ class LettuceStringCommands implements RedisStringCommands { throw new IllegalArgumentException("Bitop NOT should only be performed against one key"); } - return connection.invoke().just(it -> { - - return switch (op) { + return connection.invoke().just(it -> + switch (op) { case AND -> it.bitopAnd(destination, keys); case OR -> it.bitopOr(destination, keys); case XOR -> it.bitopXor(destination, keys); @@ -291,8 +292,7 @@ class LettuceStringCommands implements RedisStringCommands { } yield it.bitopNot(destination, keys[0]); } - }; - }); + }); } @Nullable @@ -324,11 +324,13 @@ class LettuceStringCommands implements RedisStringCommands { } private static > T getUpperValue(Range range) { + return range.getUpperBound().getValue() .orElseThrow(() -> new IllegalArgumentException("Range does not contain upper bound value")); } private static > T getLowerValue(Range range) { + return range.getLowerBound().getValue() .orElseThrow(() -> new IllegalArgumentException("Range does not contain lower bound value")); } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceZSetCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceZSetCommands.java index 8929abc8a..cc3e63601 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceZSetCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceZSetCommands.java @@ -43,10 +43,13 @@ import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** + * {@link RedisZSetCommands} implementation for {@literal Lettuce}. + * * @author Christoph Strobl * @author Mark Paluch * @author Andrey Shlykov * @author Shyngys Sapraliyev + * @author John Blum * @since 2.0 */ class LettuceZSetCommands implements RedisZSetCommands { @@ -532,10 +535,6 @@ class LettuceZSetCommands implements RedisZSetCommands { /** * @since 1.4 - * @param key - * @param cursorId - * @param options - * @return */ public Cursor zScan(byte[] key, long cursorId, ScanOptions options) { @@ -761,6 +760,7 @@ class LettuceZSetCommands implements RedisZSetCommands { if (source.contains(Flag.CH)) { target.ch(); } + return target; } } diff --git a/src/main/java/org/springframework/data/redis/core/BoundOperationsProxyFactory.java b/src/main/java/org/springframework/data/redis/core/BoundOperationsProxyFactory.java index da23b3645..fba239ab0 100644 --- a/src/main/java/org/springframework/data/redis/core/BoundOperationsProxyFactory.java +++ b/src/main/java/org/springframework/data/redis/core/BoundOperationsProxyFactory.java @@ -38,6 +38,7 @@ import org.springframework.util.ReflectionUtils; * automatically by translating interface calls to actual {@code …Operations} interfaces. * * @author Mark Paluch + * @author John Blum * @since 3.0 */ class BoundOperationsProxyFactory { @@ -54,7 +55,6 @@ class BoundOperationsProxyFactory { * @param type the {@link DataType} for which to create a proxy object. * @param operations the {@link RedisOperations} instance. * @param operationsTargetFunction function to extract the actual delegate for method calls. - * @param * @return the proxy object. */ @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -90,7 +90,7 @@ class BoundOperationsProxyFactory { return targetMethodCache.computeIfAbsent(method, it -> { - Class[] paramTypes; + Class[] paramTypes; if (isStreamRead(method)) { paramTypes = new Class[it.getParameterCount()]; @@ -144,13 +144,10 @@ class BoundOperationsProxyFactory { yield null; } case "getOperations" -> delegate.getOps(); + default -> method.getDeclaringClass() == boundOperationsInterface + ? doInvoke(invocation, method, operationsTarget, true) + : doInvoke(invocation, method, delegate, false); }; - - if (method.getDeclaringClass() == boundOperationsInterface) { - return doInvoke(invocation, method, operationsTarget, true); - } - - return doInvoke(invocation, method, delegate, false); } @Nullable diff --git a/src/main/java/org/springframework/data/redis/core/RedisConnectionUtils.java b/src/main/java/org/springframework/data/redis/core/RedisConnectionUtils.java index 506b0f49c..3f3abcac7 100644 --- a/src/main/java/org/springframework/data/redis/core/RedisConnectionUtils.java +++ b/src/main/java/org/springframework/data/redis/core/RedisConnectionUtils.java @@ -155,11 +155,13 @@ public abstract class RedisConnectionUtils { // Use same RedisConnection for further Redis actions within the transaction. // Thread-bound object will get removed by synchronization at transaction completion. RedisConnectionHolder holderToUse = conHolder; + if (holderToUse == null) { holderToUse = new RedisConnectionHolder(connection); } else { holderToUse.setConnection(connection); } + holderToUse.requested(); // Consider callback-scope connection binding vs. transaction scope binding @@ -193,25 +195,26 @@ public abstract class RedisConnectionUtils { return factory.getConnection(); } - private static void potentiallyRegisterTransactionSynchronisation(RedisConnectionHolder connHolder, + private static void potentiallyRegisterTransactionSynchronisation(RedisConnectionHolder connectionHolder, final RedisConnectionFactory factory) { // Should go actually into RedisTransactionManager - if (!connHolder.isTransactionActive()) { + if (!connectionHolder.isTransactionActive()) { - connHolder.setTransactionActive(true); - connHolder.setSynchronizedWithTransaction(true); - connHolder.requested(); + connectionHolder.setTransactionActive(true); + connectionHolder.setSynchronizedWithTransaction(true); + connectionHolder.requested(); - RedisConnection conn = connHolder.getRequiredConnection(); + RedisConnection connection = connectionHolder.getRequiredConnection(); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); + if (!readOnly) { - conn.multi(); + connection.multi(); } TransactionSynchronizationManager - .registerSynchronization(new RedisTransactionSynchronizer(connHolder, conn, factory, readOnly)); + .registerSynchronization(new RedisTransactionSynchronizer(connectionHolder, connection, factory, readOnly)); } } @@ -224,6 +227,7 @@ public abstract class RedisConnectionUtils { RedisConnectionFactory factory) { ProxyFactory proxyFactory = new ProxyFactory(connection); + proxyFactory.addAdvice(new ConnectionSplittingInterceptor(factory)); proxyFactory.addInterface(RedisConnectionProxy.class); @@ -259,8 +263,8 @@ public abstract class RedisConnectionUtils { // release transactional/read-only and non-transactional/non-bound connections. // transactional connections for read-only transactions get no synchronizer registered - unbindConnection(factory); + return; } @@ -271,21 +275,21 @@ public abstract class RedisConnectionUtils { * Determine whether the given two RedisConnections are equal, asking the target {@link RedisConnection} in case of a * proxy. Used to detect equality even if the user passed in a raw target Connection while the held one is a proxy. * - * @param conHolder the {@link RedisConnectionHolder} for the held Connection (potentially a proxy) - * @param passedInCon the {@link RedisConnection} passed-in by the user (potentially a target Connection without + * @param connectionHolder the {@link RedisConnectionHolder} for the held Connection (potentially a proxy) + * @param passedInConnetion the {@link RedisConnection} passed-in by the user (potentially a target Connection without * proxy) * @return whether the given Connections are equal * @see #getTargetConnection */ - private static boolean connectionEquals(RedisConnectionHolder conHolder, RedisConnection passedInCon) { + private static boolean connectionEquals(RedisConnectionHolder connectionHolder, RedisConnection passedInConnetion) { - if (!conHolder.hasConnection()) { + if (!connectionHolder.hasConnection()) { return false; } - RedisConnection heldCon = conHolder.getRequiredConnection(); + RedisConnection heldConnection = connectionHolder.getRequiredConnection(); - return heldCon.equals(passedInCon) || getTargetConnection(heldCon).equals(passedInCon); + return heldConnection.equals(passedInConnetion) || getTargetConnection(heldConnection).equals(passedInConnetion); } /** @@ -293,19 +297,19 @@ public abstract class RedisConnectionUtils { * {@link RedisConnection} is a proxy, it will be unwrapped until a non-proxy {@link RedisConnection} is found. * Otherwise, the passed-in {@link RedisConnection} will be returned as-is. * - * @param con the {@link RedisConnection} proxy to unwrap + * @param connection the {@link RedisConnection} proxy to unwrap * @return the innermost target Connection, or the passed-in one if no proxy * @see RedisConnectionProxy#getTargetConnection() */ - private static RedisConnection getTargetConnection(RedisConnection con) { + private static RedisConnection getTargetConnection(RedisConnection connection) { - RedisConnection conToUse = con; + RedisConnection connectionToUse = connection; - while (conToUse instanceof RedisConnectionProxy) { - conToUse = ((RedisConnectionProxy) conToUse).getTargetConnection(); + while (connectionToUse instanceof RedisConnectionProxy) { + connectionToUse = ((RedisConnectionProxy) connectionToUse).getTargetConnection(); } - return conToUse; + return connectionToUse; } /** @@ -317,9 +321,10 @@ public abstract class RedisConnectionUtils { */ public static void unbindConnection(RedisConnectionFactory factory) { - RedisConnectionHolder conHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory); + RedisConnectionHolder connectionHolder = + (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory); - if (conHolder == null) { + if (connectionHolder == null) { return; } @@ -327,16 +332,17 @@ public abstract class RedisConnectionUtils { log.debug("Unbinding Redis Connection"); } - if (conHolder.isTransactionActive()) { + if (connectionHolder.isTransactionActive()) { if (log.isDebugEnabled()) { log.debug("Redis Connection will be closed when outer transaction finished"); } } else { - RedisConnection connection = conHolder.getConnection(); - conHolder.released(); + RedisConnection connection = connectionHolder.getConnection(); - if (!conHolder.isOpen()) { + connectionHolder.released(); + + if (!connectionHolder.isOpen()) { TransactionSynchronizationManager.unbindResourceIfPossible(factory); @@ -349,18 +355,18 @@ public abstract class RedisConnectionUtils { * Return whether the given Redis connection is transactional, that is, bound to the current thread by Spring's * transaction facilities. * - * @param conn Redis connection to check - * @param connFactory Redis connection factory that the connection was created with + * @param connection Redis connection to check + * @param connectionFactory Redis connection factory that the connection was created with * @return whether the connection is transactional or not */ - public static boolean isConnectionTransactional(RedisConnection conn, RedisConnectionFactory connFactory) { + public static boolean isConnectionTransactional(RedisConnection connection, RedisConnectionFactory connectionFactory) { - Assert.notNull(connFactory, "No RedisConnectionFactory specified"); + Assert.notNull(connectionFactory, "No RedisConnectionFactory specified"); - RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager - .getResource(connFactory); + RedisConnectionHolder connectionHolder = (RedisConnectionHolder) TransactionSynchronizationManager + .getResource(connectionFactory); - return connHolder != null && connectionEquals(connHolder, conn); + return connectionHolder != null && connectionEquals(connectionHolder, connection); } private static void doCloseConnection(@Nullable RedisConnection connection) { @@ -392,15 +398,16 @@ public abstract class RedisConnectionUtils { */ private static class RedisTransactionSynchronizer implements TransactionSynchronization { - private final RedisConnectionHolder connHolder; + private final RedisConnectionHolder connectionHolder; private final RedisConnection connection; private final RedisConnectionFactory factory; + private final boolean readOnly; - RedisTransactionSynchronizer(RedisConnectionHolder connHolder, RedisConnection connection, + RedisTransactionSynchronizer(RedisConnectionHolder connectionHolder, RedisConnection connection, RedisConnectionFactory factory, boolean readOnly) { - this.connHolder = connHolder; + this.connectionHolder = connectionHolder; this.connection = connection; this.factory = factory; this.readOnly = readOnly; @@ -423,10 +430,10 @@ public abstract class RedisConnectionUtils { log.debug("Closing bound connection after transaction completed with " + status); } - connHolder.setTransactionActive(false); + connectionHolder.setTransactionActive(false); doCloseConnection(connection); TransactionSynchronizationManager.unbindResource(factory); - connHolder.reset(); + connectionHolder.reset(); } } } @@ -527,12 +534,12 @@ public abstract class RedisConnectionUtils { * Return whether this holder currently has a {@link RedisConnection}. */ protected boolean hasConnection() { - return (this.connection != null); + return this.connection != null; } @Nullable public RedisConnection getConnection() { - return connection; + return this.connection; } public RedisConnection getRequiredConnection() { diff --git a/src/main/java/org/springframework/data/redis/core/RedisKeyValueAdapter.java b/src/main/java/org/springframework/data/redis/core/RedisKeyValueAdapter.java index 51b232eac..34d309918 100644 --- a/src/main/java/org/springframework/data/redis/core/RedisKeyValueAdapter.java +++ b/src/main/java/org/springframework/data/redis/core/RedisKeyValueAdapter.java @@ -33,7 +33,6 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationListener; import org.springframework.core.convert.ConversionService; -import org.springframework.core.convert.ConverterNotFoundException; import org.springframework.data.keyvalue.core.AbstractKeyValueAdapter; import org.springframework.data.keyvalue.core.KeyValueAdapter; import org.springframework.data.mapping.PersistentPropertyAccessor; @@ -99,6 +98,7 @@ import org.springframework.util.ObjectUtils; * @author Christoph Strobl * @author Mark Paluch * @author Andrey Muchnik + * @author John Blum * @since 1.7 */ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter @@ -158,6 +158,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter MappingRedisConverter mappingConverter = new MappingRedisConverter(mappingContext, new PathIndexResolver(mappingContext), new ReferenceResolverImpl(redisOps)); + mappingConverter.setCustomConversions(customConversions == null ? new RedisCustomConversions() : customConversions); mappingConverter.afterPropertiesSet(); @@ -191,6 +192,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter public Object put(Object id, Object item, String keyspace) { RedisData rdo = item instanceof RedisData ? (RedisData) item : new RedisData(); + if (!(item instanceof RedisData)) { converter.write(item, rdo); } @@ -229,7 +231,6 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter byte[] phantomKey = ByteUtils.concat(objectKey, BinaryKeyspaceIdentifier.PHANTOM_SUFFIX); if (expires(rdo)) { - connection.del(phantomKey); connection.hMSet(phantomKey, rdo.getBucket().rawMap()); connection.expire(phantomKey, rdo.getTimeToLive() + PHANTOM_KEY_TTL); @@ -239,11 +240,13 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter } IndexWriter indexWriter = new IndexWriter(connection, converter); + if (isNew) { indexWriter.createIndexes(key, rdo.getIndexedData()); } else { indexWriter.deleteAndUpdateIndexes(key, rdo.getIndexedData()); } + return null; }); @@ -253,10 +256,9 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter @Override public boolean contains(Object id, String keyspace) { - Boolean exists = redisOps - .execute((RedisCallback) connection -> connection.sIsMember(toBytes(keyspace), toBytes(id))); + RedisCallback command = connection -> connection.sIsMember(toBytes(keyspace), toBytes(id)); - return exists != null ? exists : false; + return Boolean.TRUE.equals(this.redisOps.execute(command)); } @Nullable @@ -274,14 +276,16 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter byte[] binId = createKey(stringKeyspace, stringId); - Map raw = redisOps - .execute((RedisCallback>) connection -> connection.hGetAll(binId)); + RedisCallback> command = connection -> connection.hGetAll(binId); + + Map raw = redisOps.execute(command); if (CollectionUtils.isEmpty(raw)) { return null; } RedisData data = new RedisData(raw); + data.setId(stringId); data.setKeyspace(stringKeyspace); @@ -299,9 +303,9 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter byte[] binId = toBytes(id); byte[] binKeyspace = toBytes(keyspace); - T o = get(id, keyspace, type); + T value = get(id, keyspace, type); - if (o != null) { + if (value != null) { byte[] keyToDelete = createKey(asString(keyspace), asString(id)); @@ -314,9 +318,11 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter if (RedisKeyValueAdapter.this.keepShadowCopy()) { RedisPersistentEntity persistentEntity = converter.getMappingContext().getPersistentEntity(type); + if (persistentEntity != null && persistentEntity.isExpiring()) { byte[] phantomKey = ByteUtils.concat(keyToDelete, BinaryKeyspaceIdentifier.PHANTOM_SUFFIX); + connection.del(phantomKey); } } @@ -324,7 +330,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter }); } - return o; + return value; } @Override @@ -344,7 +350,6 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter * @param type the desired target type. * @param offset index value to start reading. * @param rows maximum number or entities to return. - * @param * @return never {@literal null}. * @since 2.5 */ @@ -362,6 +367,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter } offset = Math.max(0, offset); + if (rows > 0) { keys = keys.subList((int) offset, Math.min((int) offset + rows, keys.size())); } @@ -456,6 +462,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter if (keepShadowCopy()) { // add phantom key so values can be restored byte[] phantomKey = ByteUtils.concat(redisKey, BinaryKeyspaceIdentifier.PHANTOM_SUFFIX); + connection.hMSet(phantomKey, rdo.getBucket().rawMap()); connection.expire(phantomKey, rdo.getTimeToLive() + PHANTOM_KEY_TTL); } @@ -479,6 +486,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter RedisConnection connection) { redisUpdateObject.addFieldToRemove(toBytes(path)); + byte[] value = connection.hGet(redisUpdateObject.targetKey, toBytes(path)); if (value != null && value.length > 0) { @@ -530,7 +538,6 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter * * @param callback must not be {@literal null}. * @see RedisOperations#execute(RedisCallback) - * @return */ @Nullable public T execute(RedisCallback callback) { @@ -551,7 +558,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter } private String asString(Object value) { - return value instanceof String ? (String) value + return value instanceof String stringValue ? stringValue : getConverter().getConversionService().convert(value, String.class); } @@ -561,10 +568,6 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter /** * Convert given source to binary representation using the underlying {@link ConversionService}. - * - * @param source - * @return - * @throws ConverterNotFoundException */ public byte[] toBytes(Object source) { @@ -577,13 +580,8 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter /** * Read back and set {@link TimeToLive} for the property. - * - * @param key - * @param target - * @return */ @Nullable - @SuppressWarnings({ "unchecked", "rawtypes" }) private T readBackTimeToLiveIfSet(@Nullable byte[] key, @Nullable T target) { if (target == null || key == null) { @@ -591,9 +589,11 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter } RedisPersistentEntity entity = this.converter.getMappingContext().getRequiredPersistentEntity(target.getClass()); + if (entity.hasExplictTimeToLiveProperty()) { RedisPersistentProperty ttlProperty = entity.getExplicitTimeToLiveProperty(); + if (ttlProperty == null) { return target; } @@ -635,7 +635,6 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter /** * Configure usage of {@link KeyExpirationEventMessageListener}. * - * @param enableKeyspaceEvents * @since 1.8 */ public void setEnableKeyspaceEvents(EnableKeyspaceEvents enableKeyspaceEvents) { @@ -764,10 +763,6 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter /** * Creates new {@link MappingExpirationListener}. - * - * @param listenerContainer - * @param ops - * @param converter */ MappingExpirationListener(RedisMessageListenerContainer listenerContainer, RedisOperations ops, RedisConverter converter) { @@ -791,23 +786,24 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter Map hash = ops.execute((RedisCallback>) connection -> { - Map hash1 = connection.hGetAll(phantomKey); + Map phantomValue = connection.hGetAll(phantomKey); - if (!CollectionUtils.isEmpty(hash1)) { + if (!CollectionUtils.isEmpty(phantomValue)) { connection.del(phantomKey); } - return hash1; + return phantomValue; }); Object value = CollectionUtils.isEmpty(hash) ? null : converter.read(Object.class, new RedisData(hash)); byte[] channelAsBytes = message.getChannel(); + String channel = !ObjectUtils.isEmpty(channelAsBytes) ? converter.getConversionService().convert(channelAsBytes, String.class) : null; - RedisKeyExpiredEvent event = new RedisKeyExpiredEvent(channel, key, value); + RedisKeyExpiredEvent event = new RedisKeyExpiredEvent<>(channel, key, value); ops.execute((RedisCallback) connection -> { @@ -910,6 +906,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter } static class Index { + final DataType type; final byte[] key; @@ -917,7 +914,6 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter this.key = key; this.type = type; } - } } } diff --git a/src/main/java/org/springframework/data/redis/core/TimeoutUtils.java b/src/main/java/org/springframework/data/redis/core/TimeoutUtils.java index 0c8df5f6a..d6833b46c 100644 --- a/src/main/java/org/springframework/data/redis/core/TimeoutUtils.java +++ b/src/main/java/org/springframework/data/redis/core/TimeoutUtils.java @@ -97,11 +97,13 @@ public abstract class TimeoutUtils { } private static long roundUpIfNecessary(long timeout, long convertedTimeout) { + // A 0 timeout blocks some Redis ops indefinitely, round up if that's // not the intention if (timeout > 0 && convertedTimeout == 0) { return 1; } + return convertedTimeout; } } diff --git a/src/main/java/org/springframework/data/redis/repository/query/RedisQueryCreator.java b/src/main/java/org/springframework/data/redis/repository/query/RedisQueryCreator.java index 86e088108..1b6afa8a4 100644 --- a/src/main/java/org/springframework/data/redis/repository/query/RedisQueryCreator.java +++ b/src/main/java/org/springframework/data/redis/repository/query/RedisQueryCreator.java @@ -32,10 +32,11 @@ import org.springframework.data.repository.query.parser.PartTree; import org.springframework.util.CollectionUtils; /** - * Redis specific query creator. + * {@link AbstractQueryCreator} implementation for Redis. * * @author Christoph Strobl * @author Mark Paluch + * @author John Blum * @since 1.7 */ public class RedisQueryCreator extends AbstractQueryCreator, RedisOperationChain> { @@ -56,8 +57,10 @@ public class RedisQueryCreator extends AbstractQueryCreator sink.sismember(part.getProperty().toDotPath(), true); case FALSE -> sink.sismember(part.getProperty().toDotPath(), false); case WITHIN, NEAR -> sink.near(getNearPath(part, iterator)); - default -> throw new IllegalArgumentException( - String.format("%s is not supported for Redis query derivation", part.getType())); + default -> { + String message = String.format("%s is not supported for Redis query derivation", part.getType()); + throw new IllegalArgumentException(message); + } } return sink; @@ -96,22 +99,21 @@ public class RedisQueryCreator extends AbstractQueryCreator iterator) { - Object o = iterator.next(); + Object value = iterator.next(); Point point; Distance distance; - if (o instanceof Circle) { + if (value instanceof Circle) { + point = ((Circle) value).getCenter(); + distance = ((Circle) value).getRadius(); + } else if (value instanceof Point) { - point = ((Circle) o).getCenter(); - distance = ((Circle) o).getRadius(); - } else if (o instanceof Point) { - - point = (Point) o; + point = (Point) value; if (!iterator.hasNext()) { - throw new InvalidDataAccessApiUsageException( - "Expected to find distance value for geo query; Are you missing a parameter"); + String message = "Expected to find distance value for geo query; Are you missing a parameter"; + throw new InvalidDataAccessApiUsageException(message); } Object distObject = iterator.next(); @@ -120,12 +122,18 @@ public class RedisQueryCreator extends AbstractQueryCreator