Pull request: #2525
This commit is contained in:
John Blum
2023-06-14 16:14:56 -07:00
parent 1d293d85a3
commit 8f58c1f27c
5 changed files with 324 additions and 181 deletions

View File

@@ -23,15 +23,18 @@ import org.springframework.data.redis.util.ByteUtils;
import org.springframework.util.Assert;
/**
* Utility class encapsulating functionality commonly used for cluster slot hashing.
*
* @author Christoph Strobl
* @author John Blum
* @since 1.7
*/
public final class ClusterSlotHashUtil {
public abstract class ClusterSlotHashUtil {
public static final int SLOT_COUNT = 16384;
private static final byte SUBKEY_START = '{';
private static final byte SUBKEY_END = '}';
protected static final byte SUBKEY_START = '{';
protected static final byte SUBKEY_END = '}';
private static final int[] LOOKUP_TABLE = { 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7, 0x8108,
0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF, 0x1231, 0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7,
@@ -53,43 +56,12 @@ public final class ClusterSlotHashUtil {
0x6C07, 0x5C64, 0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1, 0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9,
0x9FF8, 0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0 };
private ClusterSlotHashUtil() {
}
/**
* @param keys must not be {@literal null}.
* @return
* @since 2.0
*/
public static boolean isSameSlotForAllKeys(Collection<ByteBuffer> keys) {
Assert.notNull(keys, "Keys must not be null");
if (keys.size() <= 1) {
return true;
}
return isSameSlotForAllKeys((byte[][]) keys.stream() //
.map(ByteBuffer::duplicate) //
.map(ByteUtils::getBytes) //
.toArray(byte[][]::new));
}
/**
* @param keys must not be {@literal null}.
* @return
* @since 2.0
*/
public static boolean isSameSlotForAllKeys(ByteBuffer... keys) {
Assert.notNull(keys, "Keys must not be null");
return isSameSlotForAllKeys(Arrays.asList(keys));
}
/**
* @param keys must not be {@literal null}.
* @return
* Determines whether all keys will hash to the same slot.
*
* @param keys array of keys to evaluate3; must not be {@literal null}.
* @return a boolean value indicating whether all keys will hash to the same slot.
* @throws IllegalArgumentException if the byte array of keys is {@literal null}.
*/
public static boolean isSameSlotForAllKeys(byte[]... keys) {
@@ -100,31 +72,75 @@ public final class ClusterSlotHashUtil {
}
int slot = calculateSlot(keys[0]);
for (int i = 1; i < keys.length; i++) {
if (slot != calculateSlot(keys[i])) {
return false;
}
}
return true;
}
/**
* Determines whether all keys will hash to the same slot.
*
* @param keys array of {@link ByteBuffer} objects containing the keys to evaluate; must not be {@literal null}.
* @return a boolean value indicating whether all keys will hash to the same slot.
* @throws IllegalArgumentException if the array of keys is {@literal null}.
* @see #isSameSlotForAllKeys(Collection)
* @since 2.0
*/
public static boolean isSameSlotForAllKeys(ByteBuffer... keys) {
Assert.notNull(keys, "Keys must not be null");
return isSameSlotForAllKeys(Arrays.asList(keys));
}
/**
* Determines whether all keys will hash to the same slot.
*
* @param keys {@link Collection} of {@link ByteBuffer} objects containing the keys to evaluate;
* must not be {@literal null}.
* @return a boolean value indicating whether all keys will hash to the same slot.
* @throws IllegalArgumentException if the {@link Collection} of keys is {@literal null}.
* @since 2.0
*/
public static boolean isSameSlotForAllKeys(Collection<ByteBuffer> keys) {
Assert.notNull(keys, "Keys must not be null");
if (keys.size() <= 1) {
return true;
}
return isSameSlotForAllKeys(keys.stream()
.map(ByteBuffer::duplicate)
.map(ByteUtils::getBytes)
.toArray(byte[][]::new));
}
/**
* Calculate the slot from the given key.
*
* @param key must not be {@literal null} or empty.
* @return
* @param key {@link String} containing the Redis key to evaluate; must not be {@literal null} or {@literal empty}.
* @return the computed slot based on the given key.
* @throws IllegalArgumentException if the given {@link String key} is {@literal null} or {@literal empty}.
* @see #calculateSlot(byte[])
*/
public static int calculateSlot(String key) {
Assert.hasText(key, "Key must not be null or empty");
return calculateSlot(key.getBytes());
}
/**
* Calculate the slot from the given key.
*
* @param key must not be {@literal null}.
* @return
* @param key array of bytes containing the Redis key to evaluate; must not be {@literal null}.
* @return the computed slot based on the given key.
*/
public static int calculateSlot(byte[] key) {
@@ -132,14 +148,17 @@ public final class ClusterSlotHashUtil {
byte[] finalKey = key;
int start = indexOf(key, SUBKEY_START);
if (start != -1) {
int end = indexOf(key, start + 1, SUBKEY_END);
if (end != -1 && end != start + 1) {
if (start != -1) {
int end = indexOf(key, start + 1, SUBKEY_END);
if (end != -1 && end != start + 1) {
finalKey = new byte[end - (start + 1)];
System.arraycopy(key, start + 1, finalKey, 0, finalKey.length);
}
}
return crc16(finalKey) % SLOT_COUNT;
}
@@ -150,7 +169,6 @@ public final class ClusterSlotHashUtil {
private static int indexOf(byte[] haystack, int start, byte needle) {
for (int i = start; i < haystack.length; i++) {
if (haystack[i] == needle) {
return i;
}
@@ -166,6 +184,7 @@ public final class ClusterSlotHashUtil {
for (byte b : bytes) {
crc = ((crc << 8) ^ LOOKUP_TABLE[((crc >>> 8) ^ (b & 0xFF)) & 0xFF]);
}
return crc & 0xFFFF;
}
}

View File

@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Set;
import org.springframework.data.redis.util.RedisAssertions;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
@@ -35,14 +36,25 @@ import org.springframework.util.CollectionUtils;
*/
public class RedisClusterNode extends RedisNode {
private SlotRange slotRange;
/**
* Get {@link RedisClusterNodeBuilder} for creating new {@link RedisClusterNode}.
*
* @return never {@literal null}.
*/
public static RedisClusterNodeBuilder newRedisClusterNode() {
return new RedisClusterNodeBuilder();
}
private @Nullable LinkState linkState;
private Set<Flag> flags;
private final SlotRange slotRange;
protected RedisClusterNode() {
super();
flags = Collections.emptySet();
this.flags = Collections.emptySet();
this.slotRange = SlotRange.empty();
}
/**
@@ -63,8 +75,19 @@ public class RedisClusterNode extends RedisNode {
public RedisClusterNode(String id) {
this(SlotRange.empty());
Assert.notNull(id, "Id must not be null");
this.id = id;
this.id = RedisAssertions.requireObject(id, "Id must not be null");
}
/**
* Creates new {@link RedisClusterNode} with given {@link SlotRange}.
*
* @param slotRange must not be {@literal null}.
*/
public RedisClusterNode(SlotRange slotRange) {
this.flags = Collections.emptySet();
this.slotRange = RedisAssertions.requireObject(slotRange,"SlotRange must not be null");
}
/**
@@ -78,28 +101,8 @@ public class RedisClusterNode extends RedisNode {
super(host, port);
Assert.notNull(slotRange, "SlotRange must not be null");
this.slotRange = slotRange;
}
/**
* Creates new {@link RedisClusterNode} with given {@link SlotRange}.
*
* @param slotRange must not be {@literal null}.
*/
public RedisClusterNode(SlotRange slotRange) {
super();
Assert.notNull(slotRange, "SlotRange must not be null");
this.slotRange = slotRange;
}
{
if (flags == null) {
flags = Collections.emptySet();
}
this.flags = Collections.emptySet();
this.slotRange = RedisAssertions.requireObject(slotRange,"SlotRange must not be null");
}
/**
@@ -108,15 +111,17 @@ public class RedisClusterNode extends RedisNode {
* @return never {@literal null}.
*/
public SlotRange getSlotRange() {
return slotRange;
return this.slotRange;
}
/**
* @param slot
* @return true if slot is covered.
* Determines whether this {@link RedisClusterNode} manages the identified {@link Integer slot} in the cluster.
*
* @param slot {@link Integer} identifying the slot to evaluate.
* @return {@literal true} if slot is covered.
*/
public boolean servesSlot(int slot) {
return slotRange.contains(slot);
return getSlotRange().contains(slot);
}
/**
@@ -124,32 +129,29 @@ public class RedisClusterNode extends RedisNode {
*/
@Nullable
public LinkState getLinkState() {
return linkState;
return this.linkState;
}
/**
* @return true if node is connected to cluster.
*/
public boolean isConnected() {
return LinkState.CONNECTED.equals(linkState);
return LinkState.CONNECTED.equals(getLinkState());
}
/**
* @return never {@literal null}.
*/
@SuppressWarnings("all")
public Set<Flag> getFlags() {
return flags == null ? Collections.emptySet() : flags;
return this.flags != null ? this.flags : Collections.emptySet();
}
/**
* @return true if node is marked as failing.
*/
public boolean isMarkedAsFail() {
if (!CollectionUtils.isEmpty(flags)) {
return flags.contains(Flag.FAIL) || flags.contains(Flag.PFAIL);
}
return false;
return CollectionUtils.containsAny(getFlags(), Arrays.asList(Flag.FAIL, Flag.PFAIL));
}
@Override
@@ -157,22 +159,23 @@ public class RedisClusterNode extends RedisNode {
return super.toString();
}
/**
* Get {@link RedisClusterNodeBuilder} for creating new {@link RedisClusterNode}.
*
* @return never {@literal null}.
*/
public static RedisClusterNodeBuilder newRedisClusterNode() {
return new RedisClusterNodeBuilder();
}
/**
* @author Christoph Strobl
* @author daihuabin
* @author John Blum
* @since 1.7
*/
public static class SlotRange {
/**
* Factory method used to construct a new, empty {@link SlotRange}.
*
* @return a new, empty {@link SlotRange}.
*/
public static SlotRange empty() {
return new SlotRange(Collections.emptySet());
}
private final BitSet range;
/**
@@ -185,18 +188,20 @@ public class RedisClusterNode extends RedisNode {
Assert.notNull(upperBound, "UpperBound must not be null");
this.range = new BitSet(upperBound + 1);
for (int i = lowerBound; i <= upperBound; i++) {
this.range.set(i);
for (int bitindex = lowerBound; bitindex <= upperBound; bitindex++) {
this.range.set(bitindex);
}
}
public SlotRange(Collection<Integer> range) {
if (CollectionUtils.isEmpty(range)) {
this.range = new BitSet(0);
} else {
this.range = new BitSet(ClusterSlotHashUtil.SLOT_COUNT);
for (Integer pos : range) {
this.range.set(pos);
for (Integer bitindex : range) {
this.range.set(bitindex);
}
}
}
@@ -205,53 +210,60 @@ public class RedisClusterNode extends RedisNode {
this.range = (BitSet) range.clone();
}
@Override
public String toString() {
return Arrays.toString(this.getSlotsArray());
}
/**
* @param slot
* Determines whether this {@link SlotRange} contains the given {@link Integer slot}, which implies
* this cluster nodes manages the slot holding data stored in Redis.
*
* @param slot {@link Integer slot} to evaluate.
* @return true when slot is part of the range.
*/
public boolean contains(int slot) {
return range.get(slot);
return this.range.get(slot);
}
/**
* @return
* Gets all slots in this {@link SlotRange} managed by this cluster node.
*
* @return all slots in this {@link SlotRange}.
*/
public Set<Integer> getSlots() {
if (range.isEmpty()) {
if (this.range.isEmpty()) {
return Collections.emptySet();
}
LinkedHashSet<Integer> slots = new LinkedHashSet<>(Math.max(2 * range.cardinality(), 11));
for (int i = 0; i < range.length(); i++) {
if (range.get(i)) {
slots.add(i);
Set<Integer> slots = new LinkedHashSet<>(Math.max(2 * this.range.cardinality(), 11));
for (int bitindex = 0; bitindex < this.range.length(); bitindex++) {
if (this.range.get(bitindex)) {
slots.add(bitindex);
}
}
return Collections.unmodifiableSet(slots);
}
public int[] getSlotsArray() {
if (range.isEmpty()) {
if (this.range.isEmpty()) {
return new int[0];
}
int[] slots = new int[range.cardinality()];
int pos = 0;
for (int i = 0; i < ClusterSlotHashUtil.SLOT_COUNT; i++) {
if (this.range.get(i)) {
slots[pos++] = i;
int[] slots = new int[this.range.cardinality()];
int arrayIndex = 0;
for (int slot = 0; slot < ClusterSlotHashUtil.SLOT_COUNT; slot++) {
if (this.range.get(slot)) {
slots[arrayIndex++] = slot;
}
}
return slots;
}
public static SlotRange empty() {
return new SlotRange(Collections.emptySet());
@Override
public String toString() {
return Arrays.toString(getSlotsArray());
}
}
@@ -267,10 +279,16 @@ public class RedisClusterNode extends RedisNode {
* @author Christoph Strobl
* @since 1.7
*/
public static enum Flag {
public enum Flag {
MYSELF("myself"), MASTER("master"), REPLICA("slave"), FAIL("fail"), PFAIL("fail?"), HANDSHAKE("handshake"), NOADDR(
"noaddr"), NOFLAGS("noflags");
MYSELF("myself"),
MASTER("master"),
REPLICA("slave"),
FAIL("fail"),
PFAIL("fail?"),
HANDSHAKE("handshake"),
NOADDR("noaddr"),
NOFLAGS("noflags");
private String raw;
@@ -279,9 +297,8 @@ public class RedisClusterNode extends RedisNode {
}
public String getRaw() {
return raw;
return this.raw;
}
}
/**

View File

@@ -52,13 +52,14 @@ import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/**
* Common type converters
* Common type converters.
*
* @author Jennifer Hickey
* @author Thomas Darimont
* @author Mark Paluch
* @author Christoph Strobl
* @author daihuabin
* @author John Blum
*/
public abstract class Converters {
@@ -100,19 +101,24 @@ public abstract class Converters {
}
public static Properties toProperties(String source) {
Properties info = new Properties();
try (StringReader stringReader = new StringReader(source)) {
info.load(stringReader);
} catch (Exception ex) {
throw new RedisSystemException("Cannot read Redis info", ex);
} catch (Exception cause) {
throw new RedisSystemException("Cannot read Redis info", cause);
}
return info;
}
public static Properties toProperties(Map<?, ?> source) {
Properties target = new Properties();
target.putAll(source);
return target;
}
@@ -170,20 +176,20 @@ public abstract class Converters {
*/
public static Set<RedisClusterNode> toSetOfRedisClusterNodes(String clusterNodes) {
if (!StringUtils.hasText(clusterNodes)) {
return Collections.emptySet();
}
String[] lines = clusterNodes.split(CLUSTER_NODES_LINE_SEPARATOR);
return toSetOfRedisClusterNodes(Arrays.asList(lines));
return StringUtils.hasText(clusterNodes)
? toSetOfRedisClusterNodes(Arrays.asList(clusterNodes.split(CLUSTER_NODES_LINE_SEPARATOR)))
: Collections.emptySet();
}
public static List<Object> toObjects(Set<Tuple> tuples) {
List<Object> tupleArgs = new ArrayList<>(tuples.size() * 2);
for (Tuple tuple : tuples) {
tupleArgs.add(tuple.getScore());
tupleArgs.add(tuple.getValue());
}
return tupleArgs;
}
@@ -228,11 +234,7 @@ public abstract class Converters {
Assert.notNull(targetUnit, "TimeUnit must not be null");
if (seconds > 0) {
return targetUnit.convert(seconds, TimeUnit.SECONDS);
}
return seconds;
return seconds > 0 ? targetUnit.convert(seconds, TimeUnit.SECONDS) : seconds;
}
/**
@@ -258,11 +260,7 @@ public abstract class Converters {
Assert.notNull(targetUnit, "TimeUnit must not be null");
if (milliseconds > 0) {
return targetUnit.convert(milliseconds, TimeUnit.MILLISECONDS);
}
return milliseconds;
return milliseconds > 0 ? targetUnit.convert(milliseconds, TimeUnit.MILLISECONDS) : milliseconds;
}
/**
@@ -285,6 +283,7 @@ public abstract class Converters {
*/
public static <V> Converter<GeoResults<GeoLocation<byte[]>>, GeoResults<GeoLocation<V>>> deserializingGeoResultsConverter(
RedisSerializer<V> serializer) {
return new DeserializingGeoResultsConverter<>(serializer);
}
@@ -314,9 +313,8 @@ public abstract class Converters {
Properties properties = new Properties();
for (int i = 0; i < input.size(); i += 2) {
properties.setProperty(input.get(i), input.get(i + 1));
for (int index = 0; index < input.size(); index += 2) {
properties.setProperty(input.get(index), input.get(index + 1));
}
return properties;
@@ -339,7 +337,7 @@ public abstract class Converters {
* @return the converter.
* @since 2.0
*/
@SuppressWarnings("unchecked")
@SuppressWarnings({ "rawtypes", "unchecked" })
public static <K, V> Converter<Map<K, V>, Properties> mapToPropertiesConverter() {
return (Converter) MapToPropertiesConverter.INSTANCE;
}
@@ -387,11 +385,13 @@ public abstract class Converters {
if (targetType == null) {
String alternatePath = sourcePath.contains(".") ? sourcePath.substring(0, sourcePath.lastIndexOf(".")) + ".*"
String alternatePath = sourcePath.contains(".")
? sourcePath.substring(0, sourcePath.lastIndexOf(".")) + ".*"
: sourcePath;
targetType = typeHintMap.get(alternatePath);
if (targetType == null) {
targetType = typeHintMap.get(alternatePath);
if (targetType == null) {
if (sourcePath.endsWith("[]")) {
targetType = String.class;
} else {
@@ -416,7 +416,6 @@ public abstract class Converters {
}
if (ClassUtils.isAssignable(String.class, targetType)) {
if (source instanceof String) {
return source.toString();
}
@@ -432,9 +431,11 @@ public abstract class Converters {
List<Object> sourceCollection = (List<Object>) source;
List<Object> targetList = new ArrayList<>();
for (int i = 0; i < sourceCollection.size(); i++) {
targetList.add(parse(sourceCollection.get(i), sourcePath + ".[" + i + "]", typeHintMap));
}
return targetList;
}
@@ -442,11 +443,14 @@ public abstract class Converters {
List<Object> sourceCollection = ((List<Object>) source);
Map<String, Object> targetMap = new LinkedHashMap<>();
for (int i = 0; i < sourceCollection.size(); i = i + 2) {
String key = parse(sourceCollection.get(i), path + ".[]", typeHintMap).toString();
targetMap.put(key, parse(sourceCollection.get(i + 1), path + "." + key, typeHintMap));
}
return targetMap;
}
@@ -520,11 +524,10 @@ public abstract class Converters {
public GeoResults<GeoLocation<V>> convert(GeoResults<GeoLocation<byte[]>> source) {
List<GeoResult<GeoLocation<V>>> values = new ArrayList<>(source.getContent().size());
for (GeoResult<GeoLocation<byte[]>> value : source.getContent()) {
values.add(new GeoResult<>(
new GeoLocation<>(serializer.deserialize(value.getContent().getName()), value.getContent().getPoint()),
value.getDistance()));
for (GeoResult<GeoLocation<byte[]>> value : source.getContent()) {
values.add(new GeoResult<>(new GeoLocation<>(serializer.deserialize(value.getContent().getName()),
value.getContent().getPoint()), value.getDistance()));
}
return new GeoResults<>(values, source.getAverageDistance().getMetric());
@@ -540,6 +543,7 @@ public abstract class Converters {
static {
flagLookupMap = new LinkedHashMap<>(Flag.values().length, 1);
for (Flag flag : Flag.values()) {
flagLookupMap.put(flag.getRaw(), flag);
}
@@ -564,12 +568,13 @@ public abstract class Converters {
Set<Flag> flags = parseFlags(args);
String portPart = hostAndPort[1];
if (portPart.contains("@")) {
portPart = portPart.substring(0, portPart.indexOf('@'));
}
RedisClusterNodeBuilder nodeBuilder = RedisClusterNode.newRedisClusterNode()
.listeningAt(hostAndPort[0], Integer.valueOf(portPart)) //
.listeningAt(hostAndPort[0], Integer.parseInt(portPart)) //
.withId(args[ID_INDEX]) //
.promotedAs(flags.contains(Flag.MASTER) ? NodeType.MASTER : NodeType.REPLICA) //
.serving(range) //
@@ -588,11 +593,13 @@ public abstract class Converters {
String raw = args[FLAGS_INDEX];
Set<Flag> flags = new LinkedHashSet<>(8, 1);
if (StringUtils.hasText(raw)) {
for (String flag : raw.split(",")) {
flags.add(flagLookupMap.get(flag));
}
}
return flags;
}
@@ -600,41 +607,38 @@ public abstract class Converters {
String raw = args[LINK_STATE_INDEX];
if (StringUtils.hasText(raw)) {
return LinkState.valueOf(raw.toUpperCase());
}
return LinkState.DISCONNECTED;
return StringUtils.hasText(raw) ? LinkState.valueOf(raw.toUpperCase()) : LinkState.DISCONNECTED;
}
private SlotRange parseSlotRange(String[] args) {
BitSet slots = new BitSet(ClusterSlotHashUtil.SLOT_COUNT);
for (int i = SLOTS_INDEX; i < args.length; i++) {
for (int index = SLOTS_INDEX; index < args.length; index++) {
String raw = args[i];
String raw = args[index];
if (raw.startsWith("[")) {
continue;
}
if (raw.contains("-")) {
String[] slotRange = StringUtils.split(raw, "-");
if (slotRange != null) {
int from = Integer.valueOf(slotRange[0]);
int to = Integer.valueOf(slotRange[1]);
int from = Integer.parseInt(slotRange[0]);
int to = Integer.parseInt(slotRange[1]);
for (int slot = from; slot <= to; slot++) {
slots.set(slot);
}
}
} else {
slots.set(Integer.valueOf(raw));
slots.set(Integer.parseInt(raw));
}
}
return new SlotRange(slots);
}
}
}