diff --git a/src/main/java/org/springframework/data/redis/connection/srp/SrpConnection.java b/src/main/java/org/springframework/data/redis/connection/srp/SrpConnection.java index b31b2ab13..cd1066fbf 100644 --- a/src/main/java/org/springframework/data/redis/connection/srp/SrpConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/srp/SrpConnection.java @@ -16,6 +16,7 @@ package org.springframework.data.redis.connection.srp; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -42,6 +43,9 @@ import redis.client.RedisException; import redis.reply.Reply; import com.google.common.base.Charsets; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; /** * {@code RedisConnection} implementation on top of spullara Redis Protocol library. @@ -56,8 +60,36 @@ public class SrpConnection implements RedisConnection { private boolean isClosed = false; private boolean isMulti = false; private Pipeline pipeline; + private PipelineTracker callback; private volatile SrpSubscription subscription; + private static class PipelineTracker implements FutureCallback { + + private List results = Collections.synchronizedList(new ArrayList()); + private List> futures = new ArrayList>(); + + public void onSuccess(Object result) { + results.add(result); + } + + public void onFailure(Throwable t) { + results.add(t); + } + + public List complete() { + try { + return Futures.successfulAsList(futures).get(); + } catch (Exception ex) { + return results; + } + } + + public void addCommand(ListenableFuture future) { + futures.add(future); + Futures.addCallback(future, this); + } + } + public SrpConnection(String host, int port, BlockingQueue queue) { try { this.client = new RedisClient(host, port); @@ -126,24 +158,13 @@ public class SrpConnection implements RedisConnection { public void openPipeline() { if (pipeline == null) { + callback = new PipelineTracker(); pipeline = client.pipeline(); } } public List closePipeline() { - // if (pipeline != null) { - // //ListenableFuture reply = pipeline.exec(); - // pipeline = null; - // if (reply != null) { - // try { - // return SrpUtils.toList(reply.get().data()); - // } catch (Exception ex) { - // throw convertSRAccessException(ex); - // } - // } - // } - throw new UnsupportedOperationException(); - //return Collections.emptyList(); + return callback.complete(); } @@ -153,7 +174,7 @@ public class SrpConnection implements RedisConnection { try { if (isPipelined()) { - pipeline.sort(key, sort, null, (Object[]) null); + pipeline(pipeline.sort(key, sort, null, (Object[]) null)); return null; } return SrpUtils.toBytesList((Reply[]) client.sort(key, sort, null, (Object[]) null).data()); @@ -168,7 +189,7 @@ public class SrpConnection implements RedisConnection { try { if (isPipelined()) { - pipeline.sort(key, sort, null, (Object[]) null); + pipeline(pipeline.sort(key, sort, null, (Object[]) null)); return null; } return ((Long) client.sort(key, sort, null, (Object[]) null).data()); @@ -180,7 +201,7 @@ public class SrpConnection implements RedisConnection { public Long dbSize() { try { if (isPipelined()) { - pipeline.dbsize(); + pipeline(pipeline.dbsize()); return null; } return client.dbsize().data(); @@ -194,7 +215,7 @@ public class SrpConnection implements RedisConnection { public void flushDb() { try { if (isPipelined()) { - pipeline.flushdb(); + pipeline(pipeline.flushdb()); return; } client.flushdb(); @@ -207,7 +228,7 @@ public class SrpConnection implements RedisConnection { public void flushAll() { try { if (isPipelined()) { - pipeline.flushall(); + pipeline(pipeline.flushall()); return; } client.flushall(); @@ -220,7 +241,7 @@ public class SrpConnection implements RedisConnection { public void bgSave() { try { if (isPipelined()) { - pipeline.bgsave(); + pipeline(pipeline.bgsave()); return; } client.bgsave(); @@ -233,7 +254,7 @@ public class SrpConnection implements RedisConnection { public void bgWriteAof() { try { if (isPipelined()) { - pipeline.bgrewriteaof(); + pipeline(pipeline.bgrewriteaof()); return; } client.bgrewriteaof(); @@ -246,7 +267,7 @@ public class SrpConnection implements RedisConnection { public void save() { try { if (isPipelined()) { - pipeline.save(); + pipeline(pipeline.save()); return; } client.save(); @@ -259,7 +280,7 @@ public class SrpConnection implements RedisConnection { public List getConfig(String param) { try { if (isPipelined()) { - pipeline.config_get(param); + pipeline(pipeline.config_get(param)); return null; } return Collections.singletonList(client.config_get(param).toString()); @@ -272,7 +293,7 @@ public class SrpConnection implements RedisConnection { public Properties info() { try { if (isPipelined()) { - pipeline.info(); + pipeline(pipeline.info()); return null; } return SrpUtils.info(client.info()); @@ -285,7 +306,7 @@ public class SrpConnection implements RedisConnection { public Long lastSave() { try { if (isPipelined()) { - pipeline.lastsave(); + pipeline(pipeline.lastsave()); return null; } return client.lastsave().data(); @@ -298,7 +319,7 @@ public class SrpConnection implements RedisConnection { public void setConfig(String param, String value) { try { if (isPipelined()) { - pipeline.config_set(param, value); + pipeline(pipeline.config_set(param, value)); return; } client.config_set(param, value); @@ -312,7 +333,7 @@ public class SrpConnection implements RedisConnection { public void resetConfigStats() { try { if (isPipelined()) { - pipeline.config_resetstat(); + pipeline(pipeline.config_resetstat()); return; } client.config_resetstat(); @@ -326,7 +347,7 @@ public class SrpConnection implements RedisConnection { byte[] save = "SAVE".getBytes(Charsets.UTF_8); try { if (isPipelined()) { - pipeline.shutdown(save, null); + pipeline(pipeline.shutdown(save, null)); return; } client.shutdown(save, null); @@ -339,7 +360,7 @@ public class SrpConnection implements RedisConnection { public byte[] echo(byte[] message) { try { if (isPipelined()) { - pipeline.echo(message); + pipeline(pipeline.echo(message)); return null; } return client.echo(message).data(); @@ -352,7 +373,7 @@ public class SrpConnection implements RedisConnection { public String ping() { try { if (isPipelined()) { - pipeline.ping(); + pipeline(pipeline.ping()); } return client.ping().data(); } catch (Exception ex) { @@ -364,7 +385,7 @@ public class SrpConnection implements RedisConnection { public Long del(byte[]... keys) { try { if (isPipelined()) { - pipeline.del((Object[]) keys); + pipeline(pipeline.del((Object[]) keys)); return null; } return client.del((Object[]) keys).data(); @@ -405,7 +426,7 @@ public class SrpConnection implements RedisConnection { public Boolean exists(byte[] key) { try { if (isPipelined()) { - pipeline.exists(key); + pipeline(pipeline.exists(key)); return null; } return client.exists(key).data() == 1; @@ -418,7 +439,7 @@ public class SrpConnection implements RedisConnection { public Boolean expire(byte[] key, long seconds) { try { if (isPipelined()) { - pipeline.expire(key, seconds); + pipeline(pipeline.expire(key, seconds)); return null; } return client.expire(key, seconds).data() == 1; @@ -431,7 +452,7 @@ public class SrpConnection implements RedisConnection { public Boolean expireAt(byte[] key, long unixTime) { try { if (isPipelined()) { - pipeline.expireat(key, unixTime); + pipeline(pipeline.expireat(key, unixTime)); return null; } return client.expireat(key, unixTime).data() == 1; @@ -444,7 +465,7 @@ public class SrpConnection implements RedisConnection { public Set keys(byte[] pattern) { try { if (isPipelined()) { - pipeline.keys(pattern); + pipeline(pipeline.keys(pattern)); return null; } return SrpUtils.toSet(client.keys(pattern).data()); @@ -475,7 +496,7 @@ public class SrpConnection implements RedisConnection { public Boolean persist(byte[] key) { try { if (isPipelined()) { - pipeline.persist(key); + pipeline(pipeline.persist(key)); return null; } return client.persist(key).data() == 1; @@ -488,7 +509,7 @@ public class SrpConnection implements RedisConnection { public Boolean move(byte[] key, int dbIndex) { try { if (isPipelined()) { - pipeline.move(key, dbIndex); + pipeline(pipeline.move(key, dbIndex)); return null; } return client.move(key, dbIndex).data() == 1; @@ -501,7 +522,7 @@ public class SrpConnection implements RedisConnection { public byte[] randomKey() { try { if (isPipelined()) { - pipeline.randomkey(); + pipeline(pipeline.randomkey()); return null; } return client.randomkey().data(); @@ -514,7 +535,7 @@ public class SrpConnection implements RedisConnection { public void rename(byte[] oldName, byte[] newName) { try { if (isPipelined()) { - pipeline.rename(oldName, newName); + pipeline(pipeline.rename(oldName, newName)); return; } client.rename(oldName, newName); @@ -527,7 +548,7 @@ public class SrpConnection implements RedisConnection { public Boolean renameNX(byte[] oldName, byte[] newName) { try { if (isPipelined()) { - pipeline.renamenx(oldName, newName); + pipeline(pipeline.renamenx(oldName, newName)); return null; } return (client.renamenx(oldName, newName).data() == 1); @@ -552,7 +573,7 @@ public class SrpConnection implements RedisConnection { public Long ttl(byte[] key) { try { if (isPipelined()) { - pipeline.ttl(key); + pipeline(pipeline.ttl(key)); return null; } return client.ttl(key).data(); @@ -565,7 +586,7 @@ public class SrpConnection implements RedisConnection { public DataType type(byte[] key) { try { if (isPipelined()) { - pipeline.type(key); + pipeline(pipeline.type(key)); return null; } return DataType.fromCode(client.type(key).data()); @@ -590,7 +611,7 @@ public class SrpConnection implements RedisConnection { } try { if (isPipelined()) { - pipeline.watch((Object[]) keys); + pipeline(pipeline.watch((Object[]) keys)); } else { client.watch((Object[]) keys); @@ -608,7 +629,7 @@ public class SrpConnection implements RedisConnection { public byte[] get(byte[] key) { try { if (isPipelined()) { - pipeline.get(key); + pipeline(pipeline.get(key)); return null; } @@ -622,7 +643,7 @@ public class SrpConnection implements RedisConnection { public void set(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.set(key, value); + pipeline(pipeline.set(key, value)); return; } client.set(key, value); @@ -636,7 +657,7 @@ public class SrpConnection implements RedisConnection { public byte[] getSet(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.getset(key, value); + pipeline(pipeline.getset(key, value)); return null; } return client.getset(key, value).data(); @@ -649,7 +670,7 @@ public class SrpConnection implements RedisConnection { public Long append(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.append(key, value); + pipeline(pipeline.append(key, value)); return null; } return client.append(key, value).data(); @@ -662,7 +683,7 @@ public class SrpConnection implements RedisConnection { public List mGet(byte[]... keys) { try { if (isPipelined()) { - pipeline.mget((Object[]) keys); + pipeline(pipeline.mget((Object[]) keys)); return null; } return SrpUtils.toBytesList(client.mget((Object[]) keys).data()); @@ -675,7 +696,7 @@ public class SrpConnection implements RedisConnection { public void mSet(Map tuples) { try { if (isPipelined()) { - pipeline.mset((Object[]) SrpUtils.convert(tuples)); + pipeline(pipeline.mset((Object[]) SrpUtils.convert(tuples))); return; } client.mset((Object[]) SrpUtils.convert(tuples)); @@ -688,7 +709,7 @@ public class SrpConnection implements RedisConnection { public void mSetNX(Map tuples) { try { if (isPipelined()) { - pipeline.msetnx((Object[]) SrpUtils.convert(tuples)); + pipeline(pipeline.msetnx((Object[]) SrpUtils.convert(tuples))); return; } client.msetnx((Object[]) SrpUtils.convert(tuples)); @@ -701,7 +722,7 @@ public class SrpConnection implements RedisConnection { public void setEx(byte[] key, long time, byte[] value) { try { if (isPipelined()) { - pipeline.setex(key, time, value); + pipeline(pipeline.setex(key, time, value)); return; } client.setex(key, time, value); @@ -714,7 +735,7 @@ public class SrpConnection implements RedisConnection { public Boolean setNX(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.setnx(key, value); + pipeline(pipeline.setnx(key, value)); return null; } return client.setnx(key, value).data() == 1; @@ -727,7 +748,7 @@ public class SrpConnection implements RedisConnection { public byte[] getRange(byte[] key, long start, long end) { try { if (isPipelined()) { - pipeline.getrange(key, start, end); + pipeline(pipeline.getrange(key, start, end)); return null; } return client.getrange(key, start, end).data(); @@ -740,7 +761,7 @@ public class SrpConnection implements RedisConnection { public Long decr(byte[] key) { try { if (isPipelined()) { - pipeline.decr(key); + pipeline(pipeline.decr(key)); return null; } return client.decr(key).data(); @@ -753,7 +774,7 @@ public class SrpConnection implements RedisConnection { public Long decrBy(byte[] key, long value) { try { if (isPipelined()) { - pipeline.decrby(key, value); + pipeline(pipeline.decrby(key, value)); return null; } return client.decrby(key, value).data(); @@ -766,7 +787,7 @@ public class SrpConnection implements RedisConnection { public Long incr(byte[] key) { try { if (isPipelined()) { - pipeline.incr(key); + pipeline(pipeline.incr(key)); return null; } return client.incr(key).data(); @@ -779,7 +800,7 @@ public class SrpConnection implements RedisConnection { public Long incrBy(byte[] key, long value) { try { if (isPipelined()) { - pipeline.incrby(key, value); + pipeline(pipeline.incrby(key, value)); return null; } return client.incrby(key, value).data(); @@ -837,7 +858,7 @@ public class SrpConnection implements RedisConnection { public Long strLen(byte[] key) { try { if (isPipelined()) { - pipeline.strlen(key); + pipeline(pipeline.strlen(key)); return null; } return client.strlen(key).data(); @@ -854,7 +875,7 @@ public class SrpConnection implements RedisConnection { public Long lPush(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.lpush(key, new Object[] { value }); + pipeline(pipeline.lpush(key, new Object[] { value })); return null; } return client.lpush(key, new Object[] { value }).data(); @@ -867,7 +888,7 @@ public class SrpConnection implements RedisConnection { public Long rPush(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.rpush(key, new Object[] { value }); + pipeline(pipeline.rpush(key, new Object[] { value })); return null; } return client.rpush(key, new Object[] { value }).data(); @@ -880,7 +901,7 @@ public class SrpConnection implements RedisConnection { public List bLPop(int timeout, byte[]... keys) { try { if (isPipelined()) { - // pipeline.blpop(timeout, keys); + // pipeline(pipeline.blpop(timeout, keys)); return null; } // return SrpUtils.toBytesList(client.blpop(timeout, keys).data()); @@ -894,7 +915,7 @@ public class SrpConnection implements RedisConnection { public List bRPop(int timeout, byte[]... keys) { try { if (isPipelined()) { - // pipeline.brpop(timeout, keys); + // pipeline(pipeline.brpop(timeout, keys)); return null; } // return SrpUtils.toBytesList(client.brpop(timeout, keys).data()); @@ -908,7 +929,7 @@ public class SrpConnection implements RedisConnection { public byte[] lIndex(byte[] key, long index) { try { if (isPipelined()) { - pipeline.lindex(key, index); + pipeline(pipeline.lindex(key, index)); return null; } return client.lindex(key, index).data(); @@ -921,7 +942,7 @@ public class SrpConnection implements RedisConnection { public Long lInsert(byte[] key, Position where, byte[] pivot, byte[] value) { try { if (isPipelined()) { - pipeline.linsert(key, SrpUtils.convertPosition(where), pivot, value); + pipeline(pipeline.linsert(key, SrpUtils.convertPosition(where), pivot, value)); return null; } return client.linsert(key, SrpUtils.convertPosition(where), pivot, value).data(); @@ -934,7 +955,7 @@ public class SrpConnection implements RedisConnection { public Long lLen(byte[] key) { try { if (isPipelined()) { - pipeline.llen(key); + pipeline(pipeline.llen(key)); return null; } return client.llen(key).data(); @@ -947,7 +968,7 @@ public class SrpConnection implements RedisConnection { public byte[] lPop(byte[] key) { try { if (isPipelined()) { - pipeline.lpop(key); + pipeline(pipeline.lpop(key)); return null; } return client.lpop(key).data(); @@ -960,7 +981,7 @@ public class SrpConnection implements RedisConnection { public List lRange(byte[] key, long start, long end) { try { if (isPipelined()) { - pipeline.lrange(key, start, end); + pipeline(pipeline.lrange(key, start, end)); return null; } return SrpUtils.toBytesList(client.lrange(key, start, end).data()); @@ -973,7 +994,7 @@ public class SrpConnection implements RedisConnection { public Long lRem(byte[] key, long count, byte[] value) { try { if (isPipelined()) { - pipeline.lrem(key, count, value); + pipeline(pipeline.lrem(key, count, value)); return null; } return client.lrem(key, count, value).data(); @@ -986,7 +1007,7 @@ public class SrpConnection implements RedisConnection { public void lSet(byte[] key, long index, byte[] value) { try { if (isPipelined()) { - pipeline.lset(key, index, value); + pipeline(pipeline.lset(key, index, value)); return; } client.lset(key, index, value); @@ -999,7 +1020,7 @@ public class SrpConnection implements RedisConnection { public void lTrim(byte[] key, long start, long end) { try { if (isPipelined()) { - pipeline.ltrim(key, start, end); + pipeline(pipeline.ltrim(key, start, end)); return; } client.ltrim(key, start, end); @@ -1012,7 +1033,7 @@ public class SrpConnection implements RedisConnection { public byte[] rPop(byte[] key) { try { if (isPipelined()) { - pipeline.rpop(key); + pipeline(pipeline.rpop(key)); return null; } return client.rpop(key).data(); @@ -1025,7 +1046,7 @@ public class SrpConnection implements RedisConnection { public byte[] rPopLPush(byte[] srcKey, byte[] dstKey) { try { if (isPipelined()) { - pipeline.rpoplpush(srcKey, dstKey); + pipeline(pipeline.rpoplpush(srcKey, dstKey)); return null; } return client.rpoplpush(srcKey, dstKey).data(); @@ -1038,7 +1059,7 @@ public class SrpConnection implements RedisConnection { public byte[] bRPopLPush(int timeout, byte[] srcKey, byte[] dstKey) { try { if (isPipelined()) { - pipeline.brpoplpush(srcKey, dstKey, timeout); + pipeline(pipeline.brpoplpush(srcKey, dstKey, timeout)); return null; } return client.brpoplpush(srcKey, dstKey, timeout).data(); @@ -1051,7 +1072,7 @@ public class SrpConnection implements RedisConnection { public Long lPushX(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.lpushx(key, value); + pipeline(pipeline.lpushx(key, value)); return null; } return client.lpushx(key, value).data(); @@ -1064,7 +1085,7 @@ public class SrpConnection implements RedisConnection { public Long rPushX(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.rpushx(key, value); + pipeline(pipeline.rpushx(key, value)); return null; } return client.rpushx(key, value).data(); @@ -1082,7 +1103,7 @@ public class SrpConnection implements RedisConnection { public Boolean sAdd(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.sadd(key, new Object[] { value }); + pipeline(pipeline.sadd(key, new Object[] { value })); return null; } return (client.sadd(key, new Object[] { value }).data() == 1); @@ -1095,7 +1116,7 @@ public class SrpConnection implements RedisConnection { public Long sCard(byte[] key) { try { if (isPipelined()) { - pipeline.scard(key); + pipeline(pipeline.scard(key)); return null; } return client.scard(key).data(); @@ -1108,7 +1129,7 @@ public class SrpConnection implements RedisConnection { public Set sDiff(byte[]... keys) { try { if (isPipelined()) { - pipeline.sdiff((Object[]) keys); + pipeline(pipeline.sdiff((Object[]) keys)); return null; } return SrpUtils.toSet(client.sdiff((Object[]) keys).data()); @@ -1121,7 +1142,7 @@ public class SrpConnection implements RedisConnection { public Long sDiffStore(byte[] destKey, byte[]... keys) { try { if (isPipelined()) { - pipeline.sdiffstore(destKey, (Object[]) keys); + pipeline(pipeline.sdiffstore(destKey, (Object[]) keys)); return null; } return client.sdiffstore(destKey, (Object[]) keys).data(); @@ -1134,7 +1155,7 @@ public class SrpConnection implements RedisConnection { public Set sInter(byte[]... keys) { try { if (isPipelined()) { - pipeline.sinter((Object[]) keys); + pipeline(pipeline.sinter((Object[]) keys)); return null; } return SrpUtils.toSet(client.sinter((Object[]) keys).data()); @@ -1147,7 +1168,7 @@ public class SrpConnection implements RedisConnection { public Long sInterStore(byte[] destKey, byte[]... keys) { try { if (isPipelined()) { - pipeline.sinterstore(destKey, (Object[]) keys); + pipeline(pipeline.sinterstore(destKey, (Object[]) keys)); return null; } return client.sinterstore(destKey, (Object[]) keys).data(); @@ -1160,7 +1181,7 @@ public class SrpConnection implements RedisConnection { public Boolean sIsMember(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.sismember(key, value); + pipeline(pipeline.sismember(key, value)); return null; } return client.sismember(key, value).data() == 1; @@ -1173,7 +1194,7 @@ public class SrpConnection implements RedisConnection { public Set sMembers(byte[] key) { try { if (isPipelined()) { - pipeline.smembers(key); + pipeline(pipeline.smembers(key)); return null; } return SrpUtils.toSet(client.smembers(key).data()); @@ -1186,7 +1207,7 @@ public class SrpConnection implements RedisConnection { public Boolean sMove(byte[] srcKey, byte[] destKey, byte[] value) { try { if (isPipelined()) { - pipeline.smove(srcKey, destKey, value); + pipeline(pipeline.smove(srcKey, destKey, value)); return null; } return client.smove(srcKey, destKey, value).data() == 1; @@ -1199,7 +1220,7 @@ public class SrpConnection implements RedisConnection { public byte[] sPop(byte[] key) { try { if (isPipelined()) { - pipeline.spop(key); + pipeline(pipeline.spop(key)); return null; } return client.spop(key).data(); @@ -1212,7 +1233,7 @@ public class SrpConnection implements RedisConnection { public byte[] sRandMember(byte[] key) { try { if (isPipelined()) { - pipeline.srandmember(key); + pipeline(pipeline.srandmember(key)); return null; } return client.srandmember(key).data(); @@ -1225,7 +1246,7 @@ public class SrpConnection implements RedisConnection { public Boolean sRem(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.srem(key, new Object[] { value }); + pipeline(pipeline.srem(key, new Object[] { value })); return null; } return client.srem(key, new Object[] { value }).data() == 1; @@ -1238,7 +1259,7 @@ public class SrpConnection implements RedisConnection { public Set sUnion(byte[]... keys) { try { if (isPipelined()) { - pipeline.sunion((Object[]) keys); + pipeline(pipeline.sunion((Object[]) keys)); return null; } return SrpUtils.toSet(client.sunion((Object[]) keys).data()); @@ -1251,7 +1272,7 @@ public class SrpConnection implements RedisConnection { public Long sUnionStore(byte[] destKey, byte[]... keys) { try { if (isPipelined()) { - pipeline.sunionstore(destKey, (Object[]) keys); + pipeline(pipeline.sunionstore(destKey, (Object[]) keys)); return null; } return client.sunionstore(destKey, (Object[]) keys).data(); @@ -1268,7 +1289,7 @@ public class SrpConnection implements RedisConnection { public Boolean zAdd(byte[] key, double score, byte[] value) { try { if (isPipelined()) { - pipeline.zadd(new Object[] { key, score, value }); + pipeline(pipeline.zadd(new Object[] { key, score, value })); return null; } return client.zadd(new Object[] { key, score, value }).data() == 1; @@ -1281,7 +1302,7 @@ public class SrpConnection implements RedisConnection { public Long zCard(byte[] key) { try { if (isPipelined()) { - pipeline.zcard(key); + pipeline(pipeline.zcard(key)); return null; } return client.zcard(key).data(); @@ -1294,7 +1315,7 @@ public class SrpConnection implements RedisConnection { public Long zCount(byte[] key, double min, double max) { try { if (isQueueing()) { - pipeline.zcount(key, min, max); + pipeline(pipeline.zcount(key, min, max)); return null; } return client.zcount(key, min, max).data(); @@ -1307,7 +1328,7 @@ public class SrpConnection implements RedisConnection { public Double zIncrBy(byte[] key, double increment, byte[] value) { try { if (isPipelined()) { - pipeline.zincrby(key, increment, value); + pipeline(pipeline.zincrby(key, increment, value)); return null; } return SrpUtils.toDouble(client.zincrby(key, increment, value).data()); @@ -1335,7 +1356,7 @@ public class SrpConnection implements RedisConnection { try { if (isQueueing()) { - pipeline.zinterstore(args); + pipeline(pipeline.zinterstore(args)); return null; } return client.zinterstore(args).data(); @@ -1347,7 +1368,7 @@ public class SrpConnection implements RedisConnection { public Set zRange(byte[] key, long start, long end) { try { if (isPipelined()) { - pipeline.zrange(key, start, end, null); + pipeline(pipeline.zrange(key, start, end, null)); return null; } return SrpUtils.toSet(client.zrange(key, start, end, null).data()); @@ -1360,7 +1381,7 @@ public class SrpConnection implements RedisConnection { public Set zRangeWithScores(byte[] key, long start, long end) { try { if (isPipelined()) { - pipeline.zrange(key, start, end, SrpUtils.WITHSCORES); + pipeline(pipeline.zrange(key, start, end, SrpUtils.WITHSCORES)); return null; } return SrpUtils.convertTuple(client.zrange(key, start, end, SrpUtils.WITHSCORES)); @@ -1373,7 +1394,7 @@ public class SrpConnection implements RedisConnection { public Set zRangeByScore(byte[] key, double min, double max) { try { if (isPipelined()) { - pipeline.zrangebyscore(key, min, max, null, null); + pipeline(pipeline.zrangebyscore(key, min, max, null, null)); return null; } return SrpUtils.toSet(client.zrangebyscore(key, min, max, null, null).data()); @@ -1386,7 +1407,7 @@ public class SrpConnection implements RedisConnection { public Set zRangeByScoreWithScores(byte[] key, double min, double max) { try { if (isPipelined()) { - pipeline.zrangebyscore(key, min, max, SrpUtils.WITHSCORES, null); + pipeline(pipeline.zrangebyscore(key, min, max, SrpUtils.WITHSCORES, null)); return null; } return SrpUtils.convertTuple(client.zrangebyscore(key, min, max, SrpUtils.WITHSCORES, null)); @@ -1399,7 +1420,7 @@ public class SrpConnection implements RedisConnection { public Set zRevRangeWithScores(byte[] key, long start, long end) { try { if (isPipelined()) { - pipeline.zrevrange(key, start, end, SrpUtils.WITHSCORES); + pipeline(pipeline.zrevrange(key, start, end, SrpUtils.WITHSCORES)); return null; } return SrpUtils.convertTuple(client.zrevrange(key, start, end, SrpUtils.WITHSCORES)); @@ -1413,7 +1434,7 @@ public class SrpConnection implements RedisConnection { try { byte[] limit = SrpUtils.limit(offset, count); if (isPipelined()) { - pipeline.zrangebyscore(key, min, max, null, limit); + pipeline(pipeline.zrangebyscore(key, min, max, null, limit)); return null; } return SrpUtils.toSet(client.zrangebyscore(key, min, max, null, limit).data()); @@ -1427,7 +1448,7 @@ public class SrpConnection implements RedisConnection { try { byte[] limit = SrpUtils.limit(offset, count); if (isPipelined()) { - pipeline.zrangebyscore(key, min, max, SrpUtils.WITHSCORES, limit); + pipeline(pipeline.zrangebyscore(key, min, max, SrpUtils.WITHSCORES, limit)); return null; } return SrpUtils.convertTuple(client.zrangebyscore(key, min, max, SrpUtils.WITHSCORES, limit)); @@ -1504,7 +1525,7 @@ public class SrpConnection implements RedisConnection { public Boolean zRem(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.zrem(key, new Object[] { value }); + pipeline(pipeline.zrem(key, new Object[] { value })); return null; } return client.zrem(key, new Object[] { value }).data() == 1; @@ -1517,7 +1538,7 @@ public class SrpConnection implements RedisConnection { public Long zRemRange(byte[] key, long start, long end) { try { if (isPipelined()) { - pipeline.zremrangebyrank(key, start, end); + pipeline(pipeline.zremrangebyrank(key, start, end)); return null; } return client.zremrangebyrank(key, start, end).data(); @@ -1530,7 +1551,7 @@ public class SrpConnection implements RedisConnection { public Long zRemRangeByScore(byte[] key, double min, double max) { try { if (isPipelined()) { - pipeline.zremrangebyscore(key, min, max); + pipeline(pipeline.zremrangebyscore(key, min, max)); return null; } return client.zremrangebyscore(key, min, max).data(); @@ -1543,7 +1564,7 @@ public class SrpConnection implements RedisConnection { public Set zRevRange(byte[] key, long start, long end) { try { if (isPipelined()) { - pipeline.zrevrange(key, start, end, null); + pipeline(pipeline.zrevrange(key, start, end, null)); return null; } return SrpUtils.toSet(client.zrevrange(key, start, end, null).data()); @@ -1556,7 +1577,7 @@ public class SrpConnection implements RedisConnection { public Long zRevRank(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.zrevrank(key, value); + pipeline(pipeline.zrevrank(key, value)); return null; } return (Long) client.zrevrank(key, value).data(); @@ -1569,7 +1590,7 @@ public class SrpConnection implements RedisConnection { public Double zScore(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.zscore(key, value); + pipeline(pipeline.zscore(key, value)); return null; } return SrpUtils.toDouble(client.zscore(key, value).data()); @@ -1587,7 +1608,7 @@ public class SrpConnection implements RedisConnection { public Long zUnionStore(byte[] destKey, byte[]... sets) { try { if (isPipelined()) { - pipeline.zunionstore(destKey, sets.length, (Object[]) sets); + pipeline(pipeline.zunionstore(destKey, sets.length, (Object[]) sets)); return null; } return client.zunionstore(destKey, sets.length, (Object[]) sets).data(); @@ -1604,7 +1625,7 @@ public class SrpConnection implements RedisConnection { public Boolean hSet(byte[] key, byte[] field, byte[] value) { try { if (isPipelined()) { - pipeline.hset(key, field, value); + pipeline(pipeline.hset(key, field, value)); return null; } return client.hset(key, field, value).data() == 1; @@ -1617,7 +1638,7 @@ public class SrpConnection implements RedisConnection { public Boolean hSetNX(byte[] key, byte[] field, byte[] value) { try { if (isPipelined()) { - pipeline.hsetnx(key, field, value); + pipeline(pipeline.hsetnx(key, field, value)); return null; } return client.hsetnx(key, field, value).data() == 1; @@ -1630,7 +1651,7 @@ public class SrpConnection implements RedisConnection { public Boolean hDel(byte[] key, byte[] field) { try { if (isPipelined()) { - pipeline.hdel(key, new Object[] { field }); + pipeline(pipeline.hdel(key, new Object[] { field })); return null; } return client.hdel(key, new Object[] { field }).data() == 1; @@ -1643,7 +1664,7 @@ public class SrpConnection implements RedisConnection { public Boolean hExists(byte[] key, byte[] field) { try { if (isPipelined()) { - pipeline.hexists(key, field); + pipeline(pipeline.hexists(key, field)); return null; } return client.hexists(key, field).data() == 1; @@ -1656,7 +1677,7 @@ public class SrpConnection implements RedisConnection { public byte[] hGet(byte[] key, byte[] field) { try { if (isPipelined()) { - pipeline.hget(key, field); + pipeline(pipeline.hget(key, field)); return null; } return client.hget(key, field).data(); @@ -1669,7 +1690,7 @@ public class SrpConnection implements RedisConnection { public Map hGetAll(byte[] key) { try { if (isPipelined()) { - pipeline.hgetall(key); + pipeline(pipeline.hgetall(key)); return null; } return SrpUtils.toMap(client.hgetall(key).data()); @@ -1682,7 +1703,7 @@ public class SrpConnection implements RedisConnection { public Long hIncrBy(byte[] key, byte[] field, long delta) { try { if (isPipelined()) { - pipeline.hincrby(key, field, delta); + pipeline(pipeline.hincrby(key, field, delta)); return null; } return client.hincrby(key, field, delta).data(); @@ -1695,7 +1716,7 @@ public class SrpConnection implements RedisConnection { public Set hKeys(byte[] key) { try { if (isPipelined()) { - pipeline.hkeys(key); + pipeline(pipeline.hkeys(key)); return null; } return SrpUtils.toSet(client.hkeys(key).data()); @@ -1708,7 +1729,7 @@ public class SrpConnection implements RedisConnection { public Long hLen(byte[] key) { try { if (isPipelined()) { - pipeline.hlen(key); + pipeline(pipeline.hlen(key)); return null; } return client.hlen(key).data(); @@ -1721,7 +1742,7 @@ public class SrpConnection implements RedisConnection { public List hMGet(byte[] key, byte[]... fields) { try { if (isPipelined()) { - pipeline.hmget(key, (Object[]) fields); + pipeline(pipeline.hmget(key, (Object[]) fields)); return null; } return SrpUtils.toBytesList(client.hmget(key, (Object[]) fields).data()); @@ -1734,7 +1755,7 @@ public class SrpConnection implements RedisConnection { public void hMSet(byte[] key, Map tuple) { try { if (isPipelined()) { - pipeline.hmset(key, SrpUtils.convert(tuple)); + pipeline(pipeline.hmset(key, SrpUtils.convert(tuple))); return; } client.hmset(key, SrpUtils.convert(tuple)); @@ -1747,7 +1768,7 @@ public class SrpConnection implements RedisConnection { public List hVals(byte[] key) { try { if (isPipelined()) { - pipeline.hvals(key); + pipeline(pipeline.hvals(key)); return null; } return SrpUtils.toBytesList(client.hvals(key).data()); @@ -1767,7 +1788,7 @@ public class SrpConnection implements RedisConnection { throw new UnsupportedOperationException(); } if (isPipelined()) { - pipeline.publish(channel, message); + pipeline(pipeline.publish(channel, message)); return null; } return client.publish(channel, message).data(); @@ -1828,4 +1849,9 @@ public class SrpConnection implements RedisConnection { "Connection already subscribed; use the connection Subscription to cancel or add new channels"); } } + + // processing method that adds a listener to the future in order to track down the results and close the pipeline + private void pipeline(ListenableFuture future) { + callback.addCommand(future); + } } \ No newline at end of file