add tracking of futures to Srp connection

This commit is contained in:
Costin Leau
2012-06-25 20:33:17 +03:00
parent 7a89656cf0
commit a169f71965

View File

@@ -16,6 +16,7 @@
package org.springframework.data.redis.connection.srp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -42,6 +43,9 @@ import redis.client.RedisException;
import redis.reply.Reply;
import com.google.common.base.Charsets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
/**
* {@code RedisConnection} implementation on top of <a href="https://github.com/spullara/redis-protocol">spullara Redis Protocol</a> library.
@@ -56,8 +60,36 @@ public class SrpConnection implements RedisConnection {
private boolean isClosed = false;
private boolean isMulti = false;
private Pipeline pipeline;
private PipelineTracker callback;
private volatile SrpSubscription subscription;
private static class PipelineTracker implements FutureCallback<Object> {
private List<Object> results = Collections.synchronizedList(new ArrayList<Object>());
private List<ListenableFuture<?>> futures = new ArrayList<ListenableFuture<?>>();
public void onSuccess(Object result) {
results.add(result);
}
public void onFailure(Throwable t) {
results.add(t);
}
public List<Object> complete() {
try {
return Futures.successfulAsList(futures).get();
} catch (Exception ex) {
return results;
}
}
public void addCommand(ListenableFuture<?> future) {
futures.add(future);
Futures.addCallback(future, this);
}
}
public SrpConnection(String host, int port, BlockingQueue<SrpConnection> queue) {
try {
this.client = new RedisClient(host, port);
@@ -126,24 +158,13 @@ public class SrpConnection implements RedisConnection {
public void openPipeline() {
if (pipeline == null) {
callback = new PipelineTracker();
pipeline = client.pipeline();
}
}
public List<Object> closePipeline() {
// if (pipeline != null) {
// //ListenableFuture<MultiBulkReply> reply = pipeline.exec();
// pipeline = null;
// if (reply != null) {
// try {
// return SrpUtils.toList(reply.get().data());
// } catch (Exception ex) {
// throw convertSRAccessException(ex);
// }
// }
// }
throw new UnsupportedOperationException();
//return Collections.emptyList();
return callback.complete();
}
@@ -153,7 +174,7 @@ public class SrpConnection implements RedisConnection {
try {
if (isPipelined()) {
pipeline.sort(key, sort, null, (Object[]) null);
pipeline(pipeline.sort(key, sort, null, (Object[]) null));
return null;
}
return SrpUtils.toBytesList((Reply[]) client.sort(key, sort, null, (Object[]) null).data());
@@ -168,7 +189,7 @@ public class SrpConnection implements RedisConnection {
try {
if (isPipelined()) {
pipeline.sort(key, sort, null, (Object[]) null);
pipeline(pipeline.sort(key, sort, null, (Object[]) null));
return null;
}
return ((Long) client.sort(key, sort, null, (Object[]) null).data());
@@ -180,7 +201,7 @@ public class SrpConnection implements RedisConnection {
public Long dbSize() {
try {
if (isPipelined()) {
pipeline.dbsize();
pipeline(pipeline.dbsize());
return null;
}
return client.dbsize().data();
@@ -194,7 +215,7 @@ public class SrpConnection implements RedisConnection {
public void flushDb() {
try {
if (isPipelined()) {
pipeline.flushdb();
pipeline(pipeline.flushdb());
return;
}
client.flushdb();
@@ -207,7 +228,7 @@ public class SrpConnection implements RedisConnection {
public void flushAll() {
try {
if (isPipelined()) {
pipeline.flushall();
pipeline(pipeline.flushall());
return;
}
client.flushall();
@@ -220,7 +241,7 @@ public class SrpConnection implements RedisConnection {
public void bgSave() {
try {
if (isPipelined()) {
pipeline.bgsave();
pipeline(pipeline.bgsave());
return;
}
client.bgsave();
@@ -233,7 +254,7 @@ public class SrpConnection implements RedisConnection {
public void bgWriteAof() {
try {
if (isPipelined()) {
pipeline.bgrewriteaof();
pipeline(pipeline.bgrewriteaof());
return;
}
client.bgrewriteaof();
@@ -246,7 +267,7 @@ public class SrpConnection implements RedisConnection {
public void save() {
try {
if (isPipelined()) {
pipeline.save();
pipeline(pipeline.save());
return;
}
client.save();
@@ -259,7 +280,7 @@ public class SrpConnection implements RedisConnection {
public List<String> getConfig(String param) {
try {
if (isPipelined()) {
pipeline.config_get(param);
pipeline(pipeline.config_get(param));
return null;
}
return Collections.singletonList(client.config_get(param).toString());
@@ -272,7 +293,7 @@ public class SrpConnection implements RedisConnection {
public Properties info() {
try {
if (isPipelined()) {
pipeline.info();
pipeline(pipeline.info());
return null;
}
return SrpUtils.info(client.info());
@@ -285,7 +306,7 @@ public class SrpConnection implements RedisConnection {
public Long lastSave() {
try {
if (isPipelined()) {
pipeline.lastsave();
pipeline(pipeline.lastsave());
return null;
}
return client.lastsave().data();
@@ -298,7 +319,7 @@ public class SrpConnection implements RedisConnection {
public void setConfig(String param, String value) {
try {
if (isPipelined()) {
pipeline.config_set(param, value);
pipeline(pipeline.config_set(param, value));
return;
}
client.config_set(param, value);
@@ -312,7 +333,7 @@ public class SrpConnection implements RedisConnection {
public void resetConfigStats() {
try {
if (isPipelined()) {
pipeline.config_resetstat();
pipeline(pipeline.config_resetstat());
return;
}
client.config_resetstat();
@@ -326,7 +347,7 @@ public class SrpConnection implements RedisConnection {
byte[] save = "SAVE".getBytes(Charsets.UTF_8);
try {
if (isPipelined()) {
pipeline.shutdown(save, null);
pipeline(pipeline.shutdown(save, null));
return;
}
client.shutdown(save, null);
@@ -339,7 +360,7 @@ public class SrpConnection implements RedisConnection {
public byte[] echo(byte[] message) {
try {
if (isPipelined()) {
pipeline.echo(message);
pipeline(pipeline.echo(message));
return null;
}
return client.echo(message).data();
@@ -352,7 +373,7 @@ public class SrpConnection implements RedisConnection {
public String ping() {
try {
if (isPipelined()) {
pipeline.ping();
pipeline(pipeline.ping());
}
return client.ping().data();
} catch (Exception ex) {
@@ -364,7 +385,7 @@ public class SrpConnection implements RedisConnection {
public Long del(byte[]... keys) {
try {
if (isPipelined()) {
pipeline.del((Object[]) keys);
pipeline(pipeline.del((Object[]) keys));
return null;
}
return client.del((Object[]) keys).data();
@@ -405,7 +426,7 @@ public class SrpConnection implements RedisConnection {
public Boolean exists(byte[] key) {
try {
if (isPipelined()) {
pipeline.exists(key);
pipeline(pipeline.exists(key));
return null;
}
return client.exists(key).data() == 1;
@@ -418,7 +439,7 @@ public class SrpConnection implements RedisConnection {
public Boolean expire(byte[] key, long seconds) {
try {
if (isPipelined()) {
pipeline.expire(key, seconds);
pipeline(pipeline.expire(key, seconds));
return null;
}
return client.expire(key, seconds).data() == 1;
@@ -431,7 +452,7 @@ public class SrpConnection implements RedisConnection {
public Boolean expireAt(byte[] key, long unixTime) {
try {
if (isPipelined()) {
pipeline.expireat(key, unixTime);
pipeline(pipeline.expireat(key, unixTime));
return null;
}
return client.expireat(key, unixTime).data() == 1;
@@ -444,7 +465,7 @@ public class SrpConnection implements RedisConnection {
public Set<byte[]> keys(byte[] pattern) {
try {
if (isPipelined()) {
pipeline.keys(pattern);
pipeline(pipeline.keys(pattern));
return null;
}
return SrpUtils.toSet(client.keys(pattern).data());
@@ -475,7 +496,7 @@ public class SrpConnection implements RedisConnection {
public Boolean persist(byte[] key) {
try {
if (isPipelined()) {
pipeline.persist(key);
pipeline(pipeline.persist(key));
return null;
}
return client.persist(key).data() == 1;
@@ -488,7 +509,7 @@ public class SrpConnection implements RedisConnection {
public Boolean move(byte[] key, int dbIndex) {
try {
if (isPipelined()) {
pipeline.move(key, dbIndex);
pipeline(pipeline.move(key, dbIndex));
return null;
}
return client.move(key, dbIndex).data() == 1;
@@ -501,7 +522,7 @@ public class SrpConnection implements RedisConnection {
public byte[] randomKey() {
try {
if (isPipelined()) {
pipeline.randomkey();
pipeline(pipeline.randomkey());
return null;
}
return client.randomkey().data();
@@ -514,7 +535,7 @@ public class SrpConnection implements RedisConnection {
public void rename(byte[] oldName, byte[] newName) {
try {
if (isPipelined()) {
pipeline.rename(oldName, newName);
pipeline(pipeline.rename(oldName, newName));
return;
}
client.rename(oldName, newName);
@@ -527,7 +548,7 @@ public class SrpConnection implements RedisConnection {
public Boolean renameNX(byte[] oldName, byte[] newName) {
try {
if (isPipelined()) {
pipeline.renamenx(oldName, newName);
pipeline(pipeline.renamenx(oldName, newName));
return null;
}
return (client.renamenx(oldName, newName).data() == 1);
@@ -552,7 +573,7 @@ public class SrpConnection implements RedisConnection {
public Long ttl(byte[] key) {
try {
if (isPipelined()) {
pipeline.ttl(key);
pipeline(pipeline.ttl(key));
return null;
}
return client.ttl(key).data();
@@ -565,7 +586,7 @@ public class SrpConnection implements RedisConnection {
public DataType type(byte[] key) {
try {
if (isPipelined()) {
pipeline.type(key);
pipeline(pipeline.type(key));
return null;
}
return DataType.fromCode(client.type(key).data());
@@ -590,7 +611,7 @@ public class SrpConnection implements RedisConnection {
}
try {
if (isPipelined()) {
pipeline.watch((Object[]) keys);
pipeline(pipeline.watch((Object[]) keys));
}
else {
client.watch((Object[]) keys);
@@ -608,7 +629,7 @@ public class SrpConnection implements RedisConnection {
public byte[] get(byte[] key) {
try {
if (isPipelined()) {
pipeline.get(key);
pipeline(pipeline.get(key));
return null;
}
@@ -622,7 +643,7 @@ public class SrpConnection implements RedisConnection {
public void set(byte[] key, byte[] value) {
try {
if (isPipelined()) {
pipeline.set(key, value);
pipeline(pipeline.set(key, value));
return;
}
client.set(key, value);
@@ -636,7 +657,7 @@ public class SrpConnection implements RedisConnection {
public byte[] getSet(byte[] key, byte[] value) {
try {
if (isPipelined()) {
pipeline.getset(key, value);
pipeline(pipeline.getset(key, value));
return null;
}
return client.getset(key, value).data();
@@ -649,7 +670,7 @@ public class SrpConnection implements RedisConnection {
public Long append(byte[] key, byte[] value) {
try {
if (isPipelined()) {
pipeline.append(key, value);
pipeline(pipeline.append(key, value));
return null;
}
return client.append(key, value).data();
@@ -662,7 +683,7 @@ public class SrpConnection implements RedisConnection {
public List<byte[]> mGet(byte[]... keys) {
try {
if (isPipelined()) {
pipeline.mget((Object[]) keys);
pipeline(pipeline.mget((Object[]) keys));
return null;
}
return SrpUtils.toBytesList(client.mget((Object[]) keys).data());
@@ -675,7 +696,7 @@ public class SrpConnection implements RedisConnection {
public void mSet(Map<byte[], byte[]> tuples) {
try {
if (isPipelined()) {
pipeline.mset((Object[]) SrpUtils.convert(tuples));
pipeline(pipeline.mset((Object[]) SrpUtils.convert(tuples)));
return;
}
client.mset((Object[]) SrpUtils.convert(tuples));
@@ -688,7 +709,7 @@ public class SrpConnection implements RedisConnection {
public void mSetNX(Map<byte[], byte[]> tuples) {
try {
if (isPipelined()) {
pipeline.msetnx((Object[]) SrpUtils.convert(tuples));
pipeline(pipeline.msetnx((Object[]) SrpUtils.convert(tuples)));
return;
}
client.msetnx((Object[]) SrpUtils.convert(tuples));
@@ -701,7 +722,7 @@ public class SrpConnection implements RedisConnection {
public void setEx(byte[] key, long time, byte[] value) {
try {
if (isPipelined()) {
pipeline.setex(key, time, value);
pipeline(pipeline.setex(key, time, value));
return;
}
client.setex(key, time, value);
@@ -714,7 +735,7 @@ public class SrpConnection implements RedisConnection {
public Boolean setNX(byte[] key, byte[] value) {
try {
if (isPipelined()) {
pipeline.setnx(key, value);
pipeline(pipeline.setnx(key, value));
return null;
}
return client.setnx(key, value).data() == 1;
@@ -727,7 +748,7 @@ public class SrpConnection implements RedisConnection {
public byte[] getRange(byte[] key, long start, long end) {
try {
if (isPipelined()) {
pipeline.getrange(key, start, end);
pipeline(pipeline.getrange(key, start, end));
return null;
}
return client.getrange(key, start, end).data();
@@ -740,7 +761,7 @@ public class SrpConnection implements RedisConnection {
public Long decr(byte[] key) {
try {
if (isPipelined()) {
pipeline.decr(key);
pipeline(pipeline.decr(key));
return null;
}
return client.decr(key).data();
@@ -753,7 +774,7 @@ public class SrpConnection implements RedisConnection {
public Long decrBy(byte[] key, long value) {
try {
if (isPipelined()) {
pipeline.decrby(key, value);
pipeline(pipeline.decrby(key, value));
return null;
}
return client.decrby(key, value).data();
@@ -766,7 +787,7 @@ public class SrpConnection implements RedisConnection {
public Long incr(byte[] key) {
try {
if (isPipelined()) {
pipeline.incr(key);
pipeline(pipeline.incr(key));
return null;
}
return client.incr(key).data();
@@ -779,7 +800,7 @@ public class SrpConnection implements RedisConnection {
public Long incrBy(byte[] key, long value) {
try {
if (isPipelined()) {
pipeline.incrby(key, value);
pipeline(pipeline.incrby(key, value));
return null;
}
return client.incrby(key, value).data();
@@ -837,7 +858,7 @@ public class SrpConnection implements RedisConnection {
public Long strLen(byte[] key) {
try {
if (isPipelined()) {
pipeline.strlen(key);
pipeline(pipeline.strlen(key));
return null;
}
return client.strlen(key).data();
@@ -854,7 +875,7 @@ public class SrpConnection implements RedisConnection {
public Long lPush(byte[] key, byte[] value) {
try {
if (isPipelined()) {
pipeline.lpush(key, new Object[] { value });
pipeline(pipeline.lpush(key, new Object[] { value }));
return null;
}
return client.lpush(key, new Object[] { value }).data();
@@ -867,7 +888,7 @@ public class SrpConnection implements RedisConnection {
public Long rPush(byte[] key, byte[] value) {
try {
if (isPipelined()) {
pipeline.rpush(key, new Object[] { value });
pipeline(pipeline.rpush(key, new Object[] { value }));
return null;
}
return client.rpush(key, new Object[] { value }).data();
@@ -880,7 +901,7 @@ public class SrpConnection implements RedisConnection {
public List<byte[]> bLPop(int timeout, byte[]... keys) {
try {
if (isPipelined()) {
// pipeline.blpop(timeout, keys);
// pipeline(pipeline.blpop(timeout, keys));
return null;
}
// return SrpUtils.toBytesList(client.blpop(timeout, keys).data());
@@ -894,7 +915,7 @@ public class SrpConnection implements RedisConnection {
public List<byte[]> bRPop(int timeout, byte[]... keys) {
try {
if (isPipelined()) {
// pipeline.brpop(timeout, keys);
// pipeline(pipeline.brpop(timeout, keys));
return null;
}
// return SrpUtils.toBytesList(client.brpop(timeout, keys).data());
@@ -908,7 +929,7 @@ public class SrpConnection implements RedisConnection {
public byte[] lIndex(byte[] key, long index) {
try {
if (isPipelined()) {
pipeline.lindex(key, index);
pipeline(pipeline.lindex(key, index));
return null;
}
return client.lindex(key, index).data();
@@ -921,7 +942,7 @@ public class SrpConnection implements RedisConnection {
public Long lInsert(byte[] key, Position where, byte[] pivot, byte[] value) {
try {
if (isPipelined()) {
pipeline.linsert(key, SrpUtils.convertPosition(where), pivot, value);
pipeline(pipeline.linsert(key, SrpUtils.convertPosition(where), pivot, value));
return null;
}
return client.linsert(key, SrpUtils.convertPosition(where), pivot, value).data();
@@ -934,7 +955,7 @@ public class SrpConnection implements RedisConnection {
public Long lLen(byte[] key) {
try {
if (isPipelined()) {
pipeline.llen(key);
pipeline(pipeline.llen(key));
return null;
}
return client.llen(key).data();
@@ -947,7 +968,7 @@ public class SrpConnection implements RedisConnection {
public byte[] lPop(byte[] key) {
try {
if (isPipelined()) {
pipeline.lpop(key);
pipeline(pipeline.lpop(key));
return null;
}
return client.lpop(key).data();
@@ -960,7 +981,7 @@ public class SrpConnection implements RedisConnection {
public List<byte[]> lRange(byte[] key, long start, long end) {
try {
if (isPipelined()) {
pipeline.lrange(key, start, end);
pipeline(pipeline.lrange(key, start, end));
return null;
}
return SrpUtils.toBytesList(client.lrange(key, start, end).data());
@@ -973,7 +994,7 @@ public class SrpConnection implements RedisConnection {
public Long lRem(byte[] key, long count, byte[] value) {
try {
if (isPipelined()) {
pipeline.lrem(key, count, value);
pipeline(pipeline.lrem(key, count, value));
return null;
}
return client.lrem(key, count, value).data();
@@ -986,7 +1007,7 @@ public class SrpConnection implements RedisConnection {
public void lSet(byte[] key, long index, byte[] value) {
try {
if (isPipelined()) {
pipeline.lset(key, index, value);
pipeline(pipeline.lset(key, index, value));
return;
}
client.lset(key, index, value);
@@ -999,7 +1020,7 @@ public class SrpConnection implements RedisConnection {
public void lTrim(byte[] key, long start, long end) {
try {
if (isPipelined()) {
pipeline.ltrim(key, start, end);
pipeline(pipeline.ltrim(key, start, end));
return;
}
client.ltrim(key, start, end);
@@ -1012,7 +1033,7 @@ public class SrpConnection implements RedisConnection {
public byte[] rPop(byte[] key) {
try {
if (isPipelined()) {
pipeline.rpop(key);
pipeline(pipeline.rpop(key));
return null;
}
return client.rpop(key).data();
@@ -1025,7 +1046,7 @@ public class SrpConnection implements RedisConnection {
public byte[] rPopLPush(byte[] srcKey, byte[] dstKey) {
try {
if (isPipelined()) {
pipeline.rpoplpush(srcKey, dstKey);
pipeline(pipeline.rpoplpush(srcKey, dstKey));
return null;
}
return client.rpoplpush(srcKey, dstKey).data();
@@ -1038,7 +1059,7 @@ public class SrpConnection implements RedisConnection {
public byte[] bRPopLPush(int timeout, byte[] srcKey, byte[] dstKey) {
try {
if (isPipelined()) {
pipeline.brpoplpush(srcKey, dstKey, timeout);
pipeline(pipeline.brpoplpush(srcKey, dstKey, timeout));
return null;
}
return client.brpoplpush(srcKey, dstKey, timeout).data();
@@ -1051,7 +1072,7 @@ public class SrpConnection implements RedisConnection {
public Long lPushX(byte[] key, byte[] value) {
try {
if (isPipelined()) {
pipeline.lpushx(key, value);
pipeline(pipeline.lpushx(key, value));
return null;
}
return client.lpushx(key, value).data();
@@ -1064,7 +1085,7 @@ public class SrpConnection implements RedisConnection {
public Long rPushX(byte[] key, byte[] value) {
try {
if (isPipelined()) {
pipeline.rpushx(key, value);
pipeline(pipeline.rpushx(key, value));
return null;
}
return client.rpushx(key, value).data();
@@ -1082,7 +1103,7 @@ public class SrpConnection implements RedisConnection {
public Boolean sAdd(byte[] key, byte[] value) {
try {
if (isPipelined()) {
pipeline.sadd(key, new Object[] { value });
pipeline(pipeline.sadd(key, new Object[] { value }));
return null;
}
return (client.sadd(key, new Object[] { value }).data() == 1);
@@ -1095,7 +1116,7 @@ public class SrpConnection implements RedisConnection {
public Long sCard(byte[] key) {
try {
if (isPipelined()) {
pipeline.scard(key);
pipeline(pipeline.scard(key));
return null;
}
return client.scard(key).data();
@@ -1108,7 +1129,7 @@ public class SrpConnection implements RedisConnection {
public Set<byte[]> sDiff(byte[]... keys) {
try {
if (isPipelined()) {
pipeline.sdiff((Object[]) keys);
pipeline(pipeline.sdiff((Object[]) keys));
return null;
}
return SrpUtils.toSet(client.sdiff((Object[]) keys).data());
@@ -1121,7 +1142,7 @@ public class SrpConnection implements RedisConnection {
public Long sDiffStore(byte[] destKey, byte[]... keys) {
try {
if (isPipelined()) {
pipeline.sdiffstore(destKey, (Object[]) keys);
pipeline(pipeline.sdiffstore(destKey, (Object[]) keys));
return null;
}
return client.sdiffstore(destKey, (Object[]) keys).data();
@@ -1134,7 +1155,7 @@ public class SrpConnection implements RedisConnection {
public Set<byte[]> sInter(byte[]... keys) {
try {
if (isPipelined()) {
pipeline.sinter((Object[]) keys);
pipeline(pipeline.sinter((Object[]) keys));
return null;
}
return SrpUtils.toSet(client.sinter((Object[]) keys).data());
@@ -1147,7 +1168,7 @@ public class SrpConnection implements RedisConnection {
public Long sInterStore(byte[] destKey, byte[]... keys) {
try {
if (isPipelined()) {
pipeline.sinterstore(destKey, (Object[]) keys);
pipeline(pipeline.sinterstore(destKey, (Object[]) keys));
return null;
}
return client.sinterstore(destKey, (Object[]) keys).data();
@@ -1160,7 +1181,7 @@ public class SrpConnection implements RedisConnection {
public Boolean sIsMember(byte[] key, byte[] value) {
try {
if (isPipelined()) {
pipeline.sismember(key, value);
pipeline(pipeline.sismember(key, value));
return null;
}
return client.sismember(key, value).data() == 1;
@@ -1173,7 +1194,7 @@ public class SrpConnection implements RedisConnection {
public Set<byte[]> sMembers(byte[] key) {
try {
if (isPipelined()) {
pipeline.smembers(key);
pipeline(pipeline.smembers(key));
return null;
}
return SrpUtils.toSet(client.smembers(key).data());
@@ -1186,7 +1207,7 @@ public class SrpConnection implements RedisConnection {
public Boolean sMove(byte[] srcKey, byte[] destKey, byte[] value) {
try {
if (isPipelined()) {
pipeline.smove(srcKey, destKey, value);
pipeline(pipeline.smove(srcKey, destKey, value));
return null;
}
return client.smove(srcKey, destKey, value).data() == 1;
@@ -1199,7 +1220,7 @@ public class SrpConnection implements RedisConnection {
public byte[] sPop(byte[] key) {
try {
if (isPipelined()) {
pipeline.spop(key);
pipeline(pipeline.spop(key));
return null;
}
return client.spop(key).data();
@@ -1212,7 +1233,7 @@ public class SrpConnection implements RedisConnection {
public byte[] sRandMember(byte[] key) {
try {
if (isPipelined()) {
pipeline.srandmember(key);
pipeline(pipeline.srandmember(key));
return null;
}
return client.srandmember(key).data();
@@ -1225,7 +1246,7 @@ public class SrpConnection implements RedisConnection {
public Boolean sRem(byte[] key, byte[] value) {
try {
if (isPipelined()) {
pipeline.srem(key, new Object[] { value });
pipeline(pipeline.srem(key, new Object[] { value }));
return null;
}
return client.srem(key, new Object[] { value }).data() == 1;
@@ -1238,7 +1259,7 @@ public class SrpConnection implements RedisConnection {
public Set<byte[]> sUnion(byte[]... keys) {
try {
if (isPipelined()) {
pipeline.sunion((Object[]) keys);
pipeline(pipeline.sunion((Object[]) keys));
return null;
}
return SrpUtils.toSet(client.sunion((Object[]) keys).data());
@@ -1251,7 +1272,7 @@ public class SrpConnection implements RedisConnection {
public Long sUnionStore(byte[] destKey, byte[]... keys) {
try {
if (isPipelined()) {
pipeline.sunionstore(destKey, (Object[]) keys);
pipeline(pipeline.sunionstore(destKey, (Object[]) keys));
return null;
}
return client.sunionstore(destKey, (Object[]) keys).data();
@@ -1268,7 +1289,7 @@ public class SrpConnection implements RedisConnection {
public Boolean zAdd(byte[] key, double score, byte[] value) {
try {
if (isPipelined()) {
pipeline.zadd(new Object[] { key, score, value });
pipeline(pipeline.zadd(new Object[] { key, score, value }));
return null;
}
return client.zadd(new Object[] { key, score, value }).data() == 1;
@@ -1281,7 +1302,7 @@ public class SrpConnection implements RedisConnection {
public Long zCard(byte[] key) {
try {
if (isPipelined()) {
pipeline.zcard(key);
pipeline(pipeline.zcard(key));
return null;
}
return client.zcard(key).data();
@@ -1294,7 +1315,7 @@ public class SrpConnection implements RedisConnection {
public Long zCount(byte[] key, double min, double max) {
try {
if (isQueueing()) {
pipeline.zcount(key, min, max);
pipeline(pipeline.zcount(key, min, max));
return null;
}
return client.zcount(key, min, max).data();
@@ -1307,7 +1328,7 @@ public class SrpConnection implements RedisConnection {
public Double zIncrBy(byte[] key, double increment, byte[] value) {
try {
if (isPipelined()) {
pipeline.zincrby(key, increment, value);
pipeline(pipeline.zincrby(key, increment, value));
return null;
}
return SrpUtils.toDouble(client.zincrby(key, increment, value).data());
@@ -1335,7 +1356,7 @@ public class SrpConnection implements RedisConnection {
try {
if (isQueueing()) {
pipeline.zinterstore(args);
pipeline(pipeline.zinterstore(args));
return null;
}
return client.zinterstore(args).data();
@@ -1347,7 +1368,7 @@ public class SrpConnection implements RedisConnection {
public Set<byte[]> zRange(byte[] key, long start, long end) {
try {
if (isPipelined()) {
pipeline.zrange(key, start, end, null);
pipeline(pipeline.zrange(key, start, end, null));
return null;
}
return SrpUtils.toSet(client.zrange(key, start, end, null).data());
@@ -1360,7 +1381,7 @@ public class SrpConnection implements RedisConnection {
public Set<Tuple> zRangeWithScores(byte[] key, long start, long end) {
try {
if (isPipelined()) {
pipeline.zrange(key, start, end, SrpUtils.WITHSCORES);
pipeline(pipeline.zrange(key, start, end, SrpUtils.WITHSCORES));
return null;
}
return SrpUtils.convertTuple(client.zrange(key, start, end, SrpUtils.WITHSCORES));
@@ -1373,7 +1394,7 @@ public class SrpConnection implements RedisConnection {
public Set<byte[]> zRangeByScore(byte[] key, double min, double max) {
try {
if (isPipelined()) {
pipeline.zrangebyscore(key, min, max, null, null);
pipeline(pipeline.zrangebyscore(key, min, max, null, null));
return null;
}
return SrpUtils.toSet(client.zrangebyscore(key, min, max, null, null).data());
@@ -1386,7 +1407,7 @@ public class SrpConnection implements RedisConnection {
public Set<Tuple> zRangeByScoreWithScores(byte[] key, double min, double max) {
try {
if (isPipelined()) {
pipeline.zrangebyscore(key, min, max, SrpUtils.WITHSCORES, null);
pipeline(pipeline.zrangebyscore(key, min, max, SrpUtils.WITHSCORES, null));
return null;
}
return SrpUtils.convertTuple(client.zrangebyscore(key, min, max, SrpUtils.WITHSCORES, null));
@@ -1399,7 +1420,7 @@ public class SrpConnection implements RedisConnection {
public Set<Tuple> zRevRangeWithScores(byte[] key, long start, long end) {
try {
if (isPipelined()) {
pipeline.zrevrange(key, start, end, SrpUtils.WITHSCORES);
pipeline(pipeline.zrevrange(key, start, end, SrpUtils.WITHSCORES));
return null;
}
return SrpUtils.convertTuple(client.zrevrange(key, start, end, SrpUtils.WITHSCORES));
@@ -1413,7 +1434,7 @@ public class SrpConnection implements RedisConnection {
try {
byte[] limit = SrpUtils.limit(offset, count);
if (isPipelined()) {
pipeline.zrangebyscore(key, min, max, null, limit);
pipeline(pipeline.zrangebyscore(key, min, max, null, limit));
return null;
}
return SrpUtils.toSet(client.zrangebyscore(key, min, max, null, limit).data());
@@ -1427,7 +1448,7 @@ public class SrpConnection implements RedisConnection {
try {
byte[] limit = SrpUtils.limit(offset, count);
if (isPipelined()) {
pipeline.zrangebyscore(key, min, max, SrpUtils.WITHSCORES, limit);
pipeline(pipeline.zrangebyscore(key, min, max, SrpUtils.WITHSCORES, limit));
return null;
}
return SrpUtils.convertTuple(client.zrangebyscore(key, min, max, SrpUtils.WITHSCORES, limit));
@@ -1504,7 +1525,7 @@ public class SrpConnection implements RedisConnection {
public Boolean zRem(byte[] key, byte[] value) {
try {
if (isPipelined()) {
pipeline.zrem(key, new Object[] { value });
pipeline(pipeline.zrem(key, new Object[] { value }));
return null;
}
return client.zrem(key, new Object[] { value }).data() == 1;
@@ -1517,7 +1538,7 @@ public class SrpConnection implements RedisConnection {
public Long zRemRange(byte[] key, long start, long end) {
try {
if (isPipelined()) {
pipeline.zremrangebyrank(key, start, end);
pipeline(pipeline.zremrangebyrank(key, start, end));
return null;
}
return client.zremrangebyrank(key, start, end).data();
@@ -1530,7 +1551,7 @@ public class SrpConnection implements RedisConnection {
public Long zRemRangeByScore(byte[] key, double min, double max) {
try {
if (isPipelined()) {
pipeline.zremrangebyscore(key, min, max);
pipeline(pipeline.zremrangebyscore(key, min, max));
return null;
}
return client.zremrangebyscore(key, min, max).data();
@@ -1543,7 +1564,7 @@ public class SrpConnection implements RedisConnection {
public Set<byte[]> zRevRange(byte[] key, long start, long end) {
try {
if (isPipelined()) {
pipeline.zrevrange(key, start, end, null);
pipeline(pipeline.zrevrange(key, start, end, null));
return null;
}
return SrpUtils.toSet(client.zrevrange(key, start, end, null).data());
@@ -1556,7 +1577,7 @@ public class SrpConnection implements RedisConnection {
public Long zRevRank(byte[] key, byte[] value) {
try {
if (isPipelined()) {
pipeline.zrevrank(key, value);
pipeline(pipeline.zrevrank(key, value));
return null;
}
return (Long) client.zrevrank(key, value).data();
@@ -1569,7 +1590,7 @@ public class SrpConnection implements RedisConnection {
public Double zScore(byte[] key, byte[] value) {
try {
if (isPipelined()) {
pipeline.zscore(key, value);
pipeline(pipeline.zscore(key, value));
return null;
}
return SrpUtils.toDouble(client.zscore(key, value).data());
@@ -1587,7 +1608,7 @@ public class SrpConnection implements RedisConnection {
public Long zUnionStore(byte[] destKey, byte[]... sets) {
try {
if (isPipelined()) {
pipeline.zunionstore(destKey, sets.length, (Object[]) sets);
pipeline(pipeline.zunionstore(destKey, sets.length, (Object[]) sets));
return null;
}
return client.zunionstore(destKey, sets.length, (Object[]) sets).data();
@@ -1604,7 +1625,7 @@ public class SrpConnection implements RedisConnection {
public Boolean hSet(byte[] key, byte[] field, byte[] value) {
try {
if (isPipelined()) {
pipeline.hset(key, field, value);
pipeline(pipeline.hset(key, field, value));
return null;
}
return client.hset(key, field, value).data() == 1;
@@ -1617,7 +1638,7 @@ public class SrpConnection implements RedisConnection {
public Boolean hSetNX(byte[] key, byte[] field, byte[] value) {
try {
if (isPipelined()) {
pipeline.hsetnx(key, field, value);
pipeline(pipeline.hsetnx(key, field, value));
return null;
}
return client.hsetnx(key, field, value).data() == 1;
@@ -1630,7 +1651,7 @@ public class SrpConnection implements RedisConnection {
public Boolean hDel(byte[] key, byte[] field) {
try {
if (isPipelined()) {
pipeline.hdel(key, new Object[] { field });
pipeline(pipeline.hdel(key, new Object[] { field }));
return null;
}
return client.hdel(key, new Object[] { field }).data() == 1;
@@ -1643,7 +1664,7 @@ public class SrpConnection implements RedisConnection {
public Boolean hExists(byte[] key, byte[] field) {
try {
if (isPipelined()) {
pipeline.hexists(key, field);
pipeline(pipeline.hexists(key, field));
return null;
}
return client.hexists(key, field).data() == 1;
@@ -1656,7 +1677,7 @@ public class SrpConnection implements RedisConnection {
public byte[] hGet(byte[] key, byte[] field) {
try {
if (isPipelined()) {
pipeline.hget(key, field);
pipeline(pipeline.hget(key, field));
return null;
}
return client.hget(key, field).data();
@@ -1669,7 +1690,7 @@ public class SrpConnection implements RedisConnection {
public Map<byte[], byte[]> hGetAll(byte[] key) {
try {
if (isPipelined()) {
pipeline.hgetall(key);
pipeline(pipeline.hgetall(key));
return null;
}
return SrpUtils.toMap(client.hgetall(key).data());
@@ -1682,7 +1703,7 @@ public class SrpConnection implements RedisConnection {
public Long hIncrBy(byte[] key, byte[] field, long delta) {
try {
if (isPipelined()) {
pipeline.hincrby(key, field, delta);
pipeline(pipeline.hincrby(key, field, delta));
return null;
}
return client.hincrby(key, field, delta).data();
@@ -1695,7 +1716,7 @@ public class SrpConnection implements RedisConnection {
public Set<byte[]> hKeys(byte[] key) {
try {
if (isPipelined()) {
pipeline.hkeys(key);
pipeline(pipeline.hkeys(key));
return null;
}
return SrpUtils.toSet(client.hkeys(key).data());
@@ -1708,7 +1729,7 @@ public class SrpConnection implements RedisConnection {
public Long hLen(byte[] key) {
try {
if (isPipelined()) {
pipeline.hlen(key);
pipeline(pipeline.hlen(key));
return null;
}
return client.hlen(key).data();
@@ -1721,7 +1742,7 @@ public class SrpConnection implements RedisConnection {
public List<byte[]> hMGet(byte[] key, byte[]... fields) {
try {
if (isPipelined()) {
pipeline.hmget(key, (Object[]) fields);
pipeline(pipeline.hmget(key, (Object[]) fields));
return null;
}
return SrpUtils.toBytesList(client.hmget(key, (Object[]) fields).data());
@@ -1734,7 +1755,7 @@ public class SrpConnection implements RedisConnection {
public void hMSet(byte[] key, Map<byte[], byte[]> tuple) {
try {
if (isPipelined()) {
pipeline.hmset(key, SrpUtils.convert(tuple));
pipeline(pipeline.hmset(key, SrpUtils.convert(tuple)));
return;
}
client.hmset(key, SrpUtils.convert(tuple));
@@ -1747,7 +1768,7 @@ public class SrpConnection implements RedisConnection {
public List<byte[]> hVals(byte[] key) {
try {
if (isPipelined()) {
pipeline.hvals(key);
pipeline(pipeline.hvals(key));
return null;
}
return SrpUtils.toBytesList(client.hvals(key).data());
@@ -1767,7 +1788,7 @@ public class SrpConnection implements RedisConnection {
throw new UnsupportedOperationException();
}
if (isPipelined()) {
pipeline.publish(channel, message);
pipeline(pipeline.publish(channel, message));
return null;
}
return client.publish(channel, message).data();
@@ -1828,4 +1849,9 @@ public class SrpConnection implements RedisConnection {
"Connection already subscribed; use the connection Subscription to cancel or add new channels");
}
}
// processing method that adds a listener to the future in order to track down the results and close the pipeline
private void pipeline(ListenableFuture<?> future) {
callback.addCommand(future);
}
}