Remove duplicate keys from RedisQueryEngine.
We now avoid duplicate keys if a key is found in two indices. Closes #2799
This commit is contained in:
@@ -20,8 +20,10 @@ import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.springframework.data.geo.Circle;
|
||||
import org.springframework.data.geo.GeoResult;
|
||||
@@ -31,8 +33,10 @@ import org.springframework.data.keyvalue.core.QueryEngine;
|
||||
import org.springframework.data.keyvalue.core.SortAccessor;
|
||||
import org.springframework.data.keyvalue.core.SpelSortAccessor;
|
||||
import org.springframework.data.keyvalue.core.query.KeyValueQuery;
|
||||
import org.springframework.data.redis.connection.RedisConnection;
|
||||
import org.springframework.data.redis.connection.RedisGeoCommands.GeoLocation;
|
||||
import org.springframework.data.redis.connection.RedisGeoCommands.GeoRadiusCommandArgs;
|
||||
import org.springframework.data.redis.connection.util.ByteArrayWrapper;
|
||||
import org.springframework.data.redis.core.convert.GeoIndexedPropertyValue;
|
||||
import org.springframework.data.redis.core.convert.RedisData;
|
||||
import org.springframework.data.redis.repository.query.RedisOperationChain;
|
||||
@@ -40,6 +44,7 @@ import org.springframework.data.redis.repository.query.RedisOperationChain.NearP
|
||||
import org.springframework.data.redis.repository.query.RedisOperationChain.PathAndValue;
|
||||
import org.springframework.data.redis.util.ByteUtils;
|
||||
import org.springframework.expression.spel.standard.SpelExpressionParser;
|
||||
import org.springframework.lang.NonNullApi;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
@@ -95,44 +100,21 @@ class RedisQueryEngine extends QueryEngine<RedisKeyValueAdapter, RedisOperationC
|
||||
|
||||
RedisCallback<Map<byte[], Map<byte[], byte[]>>> callback = connection -> {
|
||||
|
||||
List<byte[]> allKeys = new ArrayList<>();
|
||||
if (!criteria.getSismember().isEmpty()) {
|
||||
allKeys.addAll(connection.sInter(keys(keyspace + ":", criteria.getSismember())));
|
||||
}
|
||||
|
||||
if (!criteria.getOrSismember().isEmpty()) {
|
||||
allKeys.addAll(connection.sUnion(keys(keyspace + ":", criteria.getOrSismember())));
|
||||
}
|
||||
|
||||
if (criteria.getNear() != null) {
|
||||
|
||||
GeoRadiusCommandArgs limit = GeoRadiusCommandArgs.newGeoRadiusArgs();
|
||||
|
||||
if (rows > 0) {
|
||||
limit = limit.limit(rows);
|
||||
}
|
||||
|
||||
GeoResults<GeoLocation<byte[]>> x = connection.geoRadius(geoKey(keyspace + ":", criteria.getNear()),
|
||||
new Circle(criteria.getNear().getPoint(), criteria.getNear().getDistance()), limit);
|
||||
for (GeoResult<GeoLocation<byte[]>> y : x) {
|
||||
allKeys.add(y.getContent().getName());
|
||||
}
|
||||
}
|
||||
|
||||
List<byte[]> keys = findKeys(criteria, rows, keyspace, connection);
|
||||
byte[] keyspaceBin = getRequiredAdapter().getConverter().getConversionService().convert(keyspace + ":",
|
||||
byte[].class);
|
||||
|
||||
Map<byte[], Map<byte[], byte[]>> rawData = new LinkedHashMap<>();
|
||||
|
||||
if (allKeys.isEmpty() || allKeys.size() < offset) {
|
||||
if (keys.isEmpty() || keys.size() < offset) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
int offsetToUse = Math.max(0, (int) offset);
|
||||
if (rows > 0) {
|
||||
allKeys = allKeys.subList(Math.max(0, offsetToUse), Math.min(offsetToUse + rows, allKeys.size()));
|
||||
keys = keys.subList(Math.max(0, offsetToUse), Math.min(offsetToUse + rows, keys.size()));
|
||||
}
|
||||
for (byte[] id : allKeys) {
|
||||
for (byte[] id : keys) {
|
||||
|
||||
byte[] singleKey = ByteUtils.concat(keyspaceBin, id);
|
||||
rawData.put(id, connection.hGetAll(singleKey));
|
||||
@@ -161,6 +143,43 @@ class RedisQueryEngine extends QueryEngine<RedisKeyValueAdapter, RedisOperationC
|
||||
return result;
|
||||
}
|
||||
|
||||
private List<byte[]> findKeys(RedisOperationChain criteria, int rows, String keyspace, RedisConnection connection) {
|
||||
|
||||
List<byte[]> allKeys = new ArrayList<>();
|
||||
|
||||
if (!criteria.getSismember().isEmpty()) {
|
||||
allKeys.addAll(connection.sInter(keys(keyspace + ":", criteria.getSismember())));
|
||||
}
|
||||
|
||||
if (!criteria.getOrSismember().isEmpty()) {
|
||||
allKeys.addAll(connection.sUnion(keys(keyspace + ":", criteria.getOrSismember())));
|
||||
}
|
||||
|
||||
if (criteria.getNear() != null) {
|
||||
|
||||
GeoRadiusCommandArgs limit = GeoRadiusCommandArgs.newGeoRadiusArgs();
|
||||
|
||||
if (rows > 0) {
|
||||
limit = limit.limit(rows);
|
||||
}
|
||||
|
||||
GeoResults<GeoLocation<byte[]>> x = connection.geoRadius(geoKey(keyspace + ":", criteria.getNear()),
|
||||
new Circle(criteria.getNear().getPoint(), criteria.getNear().getDistance()), limit);
|
||||
for (GeoResult<GeoLocation<byte[]>> y : x) {
|
||||
allKeys.add(y.getContent().getName());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Set<ByteArrayWrapper> unique = new LinkedHashSet<>(allKeys.size());
|
||||
allKeys.forEach(key -> unique.add(new ByteArrayWrapper(key)));
|
||||
|
||||
List<byte[]> uniqueKeys = new ArrayList<>(unique.size());
|
||||
unique.forEach(key -> uniqueKeys.add(key.getArray()));
|
||||
|
||||
return uniqueKeys;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<?> execute(RedisOperationChain criteria, Comparator<?> sort, long offset, int rows,
|
||||
String keyspace) {
|
||||
|
||||
Reference in New Issue
Block a user