diff --git a/src/main/java/org/springframework/data/redis/connection/RedisConnection.java b/src/main/java/org/springframework/data/redis/connection/RedisConnection.java index 5e23602ef..7d69efacf 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisConnection.java @@ -93,7 +93,8 @@ public interface RedisConnection extends RedisCommands { * Executes the commands in the pipeline and returns their result. * If the connection is not pipelined, an empty collection is returned. * + * @throws RedisPipelineException if the pipeline contains any incorrect/invalid statements * @return the result of the executed commands. */ - List closePipeline(); + List closePipeline() throws RedisPipelineException; } \ No newline at end of file diff --git a/src/main/java/org/springframework/data/redis/connection/RedisPipelineException.java b/src/main/java/org/springframework/data/redis/connection/RedisPipelineException.java new file mode 100644 index 000000000..76f10c4d3 --- /dev/null +++ b/src/main/java/org/springframework/data/redis/connection/RedisPipelineException.java @@ -0,0 +1,90 @@ +/* + * Copyright 2011-2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.data.redis.connection; + +import java.util.Collections; +import java.util.List; + +import org.springframework.dao.InvalidDataAccessResourceUsageException; + +/** + * Exception thrown when executing/closing a pipeline that contains one or multiple invalid/incorrect statements. + * The exception might also contain the pipeline result (if the driver returns it), allowing for analysis and tracing. + *

+ * Typically, the first exception returned by the pipeline is used as the cause of this exception for easier + * debugging. + * + * @author Costin Leau + */ +public class RedisPipelineException extends InvalidDataAccessResourceUsageException { + + private final List results; + + /** + * Constructs a new RedisPipelineException instance. + * + * @param msg the message + * @param cause the cause + * @param pipelineResult the pipeline result + */ + public RedisPipelineException(String msg, Throwable cause, List pipelineResult) { + super(msg, cause); + results = Collections.unmodifiableList(pipelineResult); + } + + /** + * Constructs a new RedisPipelineException instance using a default message. + * + * @param cause the cause + * @param pipelineResult the pipeline result + */ + public RedisPipelineException(Exception cause, List pipelineResult) { + this("Pipeline contained one or more invalid commands", cause, pipelineResult); + } + + /** + * Constructs a new RedisPipelineException instance using a default message + * and an empty pipeline result list. + * + * @param cause the cause + */ + public RedisPipelineException(Exception cause) { + this("Pipeline contained one or more invalid commands", cause, Collections.emptyList()); + } + + /** + * Constructs a new RedisPipelineException instance. + * + * @param msg message + * @param pipelineResult pipeline partial results + */ + public RedisPipelineException(String msg, List pipelineResult) { + super(msg); + results = Collections.unmodifiableList(pipelineResult); + } + + /** + * Optionally returns the result of the pipeline that caused the exception. + * Typically contains both the results of the successful statements but also + * the exceptions of the incorrect ones. + * + * @return result of the pipeline + */ + public List getPipelineResult() { + return results; + } +} \ No newline at end of file diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java index 746f6178d..5bbb6d820 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java @@ -30,6 +30,7 @@ import org.springframework.data.redis.RedisSystemException; import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisPipelineException; import org.springframework.data.redis.connection.RedisSubscribedConnectionException; import org.springframework.data.redis.connection.SortParameters; import org.springframework.data.redis.connection.Subscription; @@ -68,8 +69,8 @@ public class JedisConnection implements RedisConnection { static { CLIENT_FIELD = ReflectionUtils.findField(BinaryJedis.class, "client", Client.class); ReflectionUtils.makeAccessible(CLIENT_FIELD); - SEND_COMMAND = ReflectionUtils.findMethod(Connection.class, "sendCommand", - new Class[] { Command.class, byte[][].class }); + SEND_COMMAND = ReflectionUtils.findMethod(Connection.class, "sendCommand", new Class[] { Command.class, + byte[][].class }); ReflectionUtils.makeAccessible(SEND_COMMAND); GET_RESPONSE = ReflectionUtils.findMethod(Queable.class, "getResponse", Builder.class); ReflectionUtils.makeAccessible(GET_RESPONSE); @@ -135,27 +136,33 @@ public class JedisConnection implements RedisConnection { public Object execute(String command, byte[]... args) { Assert.hasText(command, "a valid command needs to be specified"); - List mArgs = new ArrayList(); - if (!ObjectUtils.isEmpty(args)) { - Collections.addAll(mArgs, args); + try { + List mArgs = new ArrayList(); + if (!ObjectUtils.isEmpty(args)) { + Collections.addAll(mArgs, args); + } + + Object result = ReflectionUtils.invokeMethod(SEND_COMMAND, client, + Command.valueOf(command.trim().toUpperCase()), mArgs.toArray(new byte[mArgs.size()][])); + if (isQueueing() || isPipelined()) { + Object target = (isPipelined() ? pipeline : transaction); + ReflectionUtils.invokeMethod(GET_RESPONSE, target, new Builder() { + public Object build(Object data) { + return data; + } + + public String toString() { + return "Object"; + } + }); + } + else { + client.getOne(); + } + return result; + } catch (Exception ex) { + throw convertJedisAccessException(ex); } - - Object result = ReflectionUtils.invokeMethod(SEND_COMMAND, client, - Command.valueOf(command.trim().toUpperCase()), mArgs.toArray(new byte[mArgs.size()][])); - if (isQueueing() || isPipelined()) { - Object target = (isPipelined() ? pipeline : transaction); - ReflectionUtils.invokeMethod(GET_RESPONSE, target, new Builder() { - public Object build(Object data) { - return data; - } - - public String toString() { - return "Object"; - } - }); - } - - return result; } public void close() throws DataAccessException { @@ -230,15 +237,30 @@ public class JedisConnection implements RedisConnection { @SuppressWarnings("unchecked") public List closePipeline() { if (pipeline != null) { - List execute = pipeline.syncAndReturnAll(); + List execute = pipeline.syncAndReturnAll(); if (execute != null && !execute.isEmpty()) { + Exception cause = null; + for (int i = 0; i < execute.size(); i++) { + Object object = execute.get(i); + if (object instanceof Exception) { + DataAccessException dataAccessException = convertJedisAccessException((Exception) object); + if (cause == null) { + cause = dataAccessException; + } + execute.set(i, dataAccessException); + } + } + + if (cause != null) { + throw new RedisPipelineException(cause, execute); + } + return execute; } } return Collections.emptyList(); } - public List sort(byte[] key, SortParameters params) { SortingParams sortParams = JedisUtils.convertSortParams(params); diff --git a/src/main/java/org/springframework/data/redis/connection/jredis/JredisConnection.java b/src/main/java/org/springframework/data/redis/connection/jredis/JredisConnection.java index f0cdc25d1..d99e7dbe1 100644 --- a/src/main/java/org/springframework/data/redis/connection/jredis/JredisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/jredis/JredisConnection.java @@ -89,14 +89,17 @@ public class JredisConnection implements RedisConnection { public Object execute(String command, byte[]... args) { Assert.hasText(command, "a valid command needs to be specified"); - List mArgs = new ArrayList(); - if (!ObjectUtils.isEmpty(args)) { - Collections.addAll(mArgs, args); + try { + List mArgs = new ArrayList(); + if (!ObjectUtils.isEmpty(args)) { + Collections.addAll(mArgs, args); + } + + return ReflectionUtils.invokeMethod(SERVICE_REQUEST, jredis, Command.valueOf(command.trim().toUpperCase()), + mArgs.toArray(new byte[mArgs.size()][])); + } catch (Exception ex) { + throw convertJredisAccessException(ex); } - - return ReflectionUtils.invokeMethod(SERVICE_REQUEST, jredis, Command.valueOf(command.trim().toUpperCase()), - mArgs.toArray(new byte[mArgs.size()][])); - } public void close() throws RedisSystemException { diff --git a/src/main/java/org/springframework/data/redis/connection/rjc/RjcConnection.java b/src/main/java/org/springframework/data/redis/connection/rjc/RjcConnection.java index 621dc9591..1170e859a 100644 --- a/src/main/java/org/springframework/data/redis/connection/rjc/RjcConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/rjc/RjcConnection.java @@ -34,6 +34,7 @@ import org.springframework.data.redis.RedisSystemException; import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisPipelineException; import org.springframework.data.redis.connection.RedisSubscribedConnectionException; import org.springframework.data.redis.connection.SortParameters; import org.springframework.data.redis.connection.Subscription; @@ -83,9 +84,16 @@ public class RjcConnection implements RedisConnection { public Object execute(String command, byte[]... args) { Assert.hasText(command, "a valid command needs to be specified"); - connection.sendCommand(Command.valueOf(command.trim().toUpperCase()), - (ObjectUtils.isEmpty(args) ? new byte[0][] : args)); - return connection.getAll(); + try { + connection.sendCommand(Command.valueOf(command.trim().toUpperCase()), + (ObjectUtils.isEmpty(args) ? new byte[0][] : args)); + if (!isPipelined()) { + return connection.getAll(); + } + return null; + } catch (Exception ex) { + throw convertRjcAccessException(ex); + } } public void close() throws DataAccessException { @@ -135,9 +143,10 @@ public class RjcConnection implements RedisConnection { @SuppressWarnings("unchecked") public List closePipeline() { if (pipeline != null) { - List execute = client.getAll(); - if (execute != null && !execute.isEmpty()) { - return execute; + try { + List execute = client.getAll(); + } catch (Exception ex) { + throw new RedisPipelineException(convertRjcAccessException(ex)); } } return Collections.emptyList(); diff --git a/src/main/java/org/springframework/data/redis/connection/srp/SrpConnection.java b/src/main/java/org/springframework/data/redis/connection/srp/SrpConnection.java index b7bfa2617..5fda0f75d 100644 --- a/src/main/java/org/springframework/data/redis/connection/srp/SrpConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/srp/SrpConnection.java @@ -16,6 +16,7 @@ package org.springframework.data.redis.connection.srp; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -30,6 +31,7 @@ import org.springframework.data.redis.RedisSystemException; import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisPipelineException; import org.springframework.data.redis.connection.RedisSubscribedConnectionException; import org.springframework.data.redis.connection.SortParameters; import org.springframework.data.redis.connection.Subscription; @@ -42,6 +44,9 @@ import redis.client.RedisException; import redis.reply.Reply; import com.google.common.base.Charsets; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; /** * {@code RedisConnection} implementation on top of spullara Redis Protocol library. @@ -56,8 +61,43 @@ public class SrpConnection implements RedisConnection { private boolean isClosed = false; private boolean isMulti = false; private Pipeline pipeline; + private PipelineTracker callback; private volatile SrpSubscription subscription; + private static class PipelineTracker implements FutureCallback { + + private final List results = Collections.synchronizedList(new ArrayList()); + private final List> futures = new ArrayList>(); + + public void onSuccess(Reply result) { + results.add(result.data()); + } + + public void onFailure(Throwable t) { + results.add(t); + } + + public List complete() { + try { + Futures.successfulAsList(futures).get(); + } catch (Exception ex) { + // ignore + } + + return results; + } + + public void addCommand(ListenableFuture future) { + futures.add(future); + Futures.addCallback(future, this); + } + + public void close() { + results.clear(); + futures.clear(); + } + } + public SrpConnection(String host, int port, BlockingQueue queue) { try { this.client = new RedisClient(host, port); @@ -67,7 +107,7 @@ public class SrpConnection implements RedisConnection { } } - protected DataAccessException convertSRAccessException(Exception ex) { + protected DataAccessException convertSrpAccessException(Exception ex) { if (ex instanceof RedisException) { return SrpUtils.convertSRedisAccessException((RedisException) ex); } @@ -75,19 +115,23 @@ public class SrpConnection implements RedisConnection { return new RedisConnectionFailureException("Redis connection failed", (IOException) ex); } - return new RedisSystemException("Unknown SRedis exception", ex); + return new RedisSystemException("Unknown SRP exception", ex); } public Object execute(String command, byte[]... args) { Assert.hasText(command, "a valid command needs to be specified"); - String name = command.trim().toUpperCase(); - Command cmd = new Command(name.getBytes(Charsets.UTF_8), args); - if (isPipelined()) { - client.pipeline(name, cmd); - return null; - } - else { - return client.execute(name, cmd); + try { + String name = command.trim().toUpperCase(); + Command cmd = new Command(name.getBytes(Charsets.UTF_8), args); + if (isPipelined()) { + pipeline(client.pipeline(name, cmd)); + return null; + } + else { + return client.execute(name, cmd); + } + } catch (RedisException ex) { + throw convertSrpAccessException(ex); } } @@ -98,7 +142,7 @@ public class SrpConnection implements RedisConnection { try { client.close(); } catch (IOException ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -122,24 +166,37 @@ public class SrpConnection implements RedisConnection { public void openPipeline() { if (pipeline == null) { + callback = new PipelineTracker(); pipeline = client.pipeline(); } } public List closePipeline() { - // if (pipeline != null) { - // //ListenableFuture reply = pipeline.exec(); - // pipeline = null; - // if (reply != null) { - // try { - // return SrpUtils.toList(reply.get().data()); - // } catch (Exception ex) { - // throw convertSRAccessException(ex); - // } - // } - // } - throw new UnsupportedOperationException(); - //return Collections.emptyList(); + if (pipeline != null) { + List execute = new ArrayList(callback.complete()); + callback.close(); + callback = null; + if (execute != null && !execute.isEmpty()) { + Exception cause = null; + for (int i = 0; i < execute.size(); i++) { + Object object = execute.get(i); + if (object instanceof Exception) { + DataAccessException dataAccessException = convertSrpAccessException((Exception) object); + if (cause == null) { + cause = dataAccessException; + } + execute.set(i, dataAccessException); + } + } + if (cause != null) { + throw new RedisPipelineException(cause, execute); + } + + return execute; + } + } + + return Collections.emptyList(); } @@ -149,12 +206,12 @@ public class SrpConnection implements RedisConnection { try { if (isPipelined()) { - pipeline.sort(key, sort, null, (Object[]) null); + pipeline(pipeline.sort(key, sort, null, (Object[]) null)); return null; } return SrpUtils.toBytesList((Reply[]) client.sort(key, sort, null, (Object[]) null).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -164,24 +221,24 @@ public class SrpConnection implements RedisConnection { try { if (isPipelined()) { - pipeline.sort(key, sort, null, (Object[]) null); + pipeline(pipeline.sort(key, sort, null, (Object[]) null)); return null; } return ((Long) client.sort(key, sort, null, (Object[]) null).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } public Long dbSize() { try { if (isPipelined()) { - pipeline.dbsize(); + pipeline(pipeline.dbsize()); return null; } return client.dbsize().data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -190,12 +247,12 @@ public class SrpConnection implements RedisConnection { public void flushDb() { try { if (isPipelined()) { - pipeline.flushdb(); + pipeline(pipeline.flushdb()); return; } client.flushdb(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -203,12 +260,12 @@ public class SrpConnection implements RedisConnection { public void flushAll() { try { if (isPipelined()) { - pipeline.flushall(); + pipeline(pipeline.flushall()); return; } client.flushall(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -216,12 +273,12 @@ public class SrpConnection implements RedisConnection { public void bgSave() { try { if (isPipelined()) { - pipeline.bgsave(); + pipeline(pipeline.bgsave()); return; } client.bgsave(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -229,12 +286,12 @@ public class SrpConnection implements RedisConnection { public void bgWriteAof() { try { if (isPipelined()) { - pipeline.bgrewriteaof(); + pipeline(pipeline.bgrewriteaof()); return; } client.bgrewriteaof(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -242,12 +299,12 @@ public class SrpConnection implements RedisConnection { public void save() { try { if (isPipelined()) { - pipeline.save(); + pipeline(pipeline.save()); return; } client.save(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -255,12 +312,12 @@ public class SrpConnection implements RedisConnection { public List getConfig(String param) { try { if (isPipelined()) { - pipeline.config_get(param); + pipeline(pipeline.config_get(param)); return null; } return Collections.singletonList(client.config_get(param).toString()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -268,12 +325,12 @@ public class SrpConnection implements RedisConnection { public Properties info() { try { if (isPipelined()) { - pipeline.info(); + pipeline(pipeline.info()); return null; } return SrpUtils.info(client.info()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -281,12 +338,12 @@ public class SrpConnection implements RedisConnection { public Long lastSave() { try { if (isPipelined()) { - pipeline.lastsave(); + pipeline(pipeline.lastsave()); return null; } return client.lastsave().data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -294,12 +351,12 @@ public class SrpConnection implements RedisConnection { public void setConfig(String param, String value) { try { if (isPipelined()) { - pipeline.config_set(param, value); + pipeline(pipeline.config_set(param, value)); return; } client.config_set(param, value); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -308,12 +365,12 @@ public class SrpConnection implements RedisConnection { public void resetConfigStats() { try { if (isPipelined()) { - pipeline.config_resetstat(); + pipeline(pipeline.config_resetstat()); return; } client.config_resetstat(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -322,12 +379,12 @@ public class SrpConnection implements RedisConnection { byte[] save = "SAVE".getBytes(Charsets.UTF_8); try { if (isPipelined()) { - pipeline.shutdown(save, null); + pipeline(pipeline.shutdown(save, null)); return; } client.shutdown(save, null); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -335,12 +392,12 @@ public class SrpConnection implements RedisConnection { public byte[] echo(byte[] message) { try { if (isPipelined()) { - pipeline.echo(message); + pipeline(pipeline.echo(message)); return null; } return client.echo(message).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -348,11 +405,11 @@ public class SrpConnection implements RedisConnection { public String ping() { try { if (isPipelined()) { - pipeline.ping(); + pipeline(pipeline.ping()); } return client.ping().data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -360,12 +417,12 @@ public class SrpConnection implements RedisConnection { public Long del(byte[]... keys) { try { if (isPipelined()) { - pipeline.del((Object[]) keys); + pipeline(pipeline.del((Object[]) keys)); return null; } return client.del((Object[]) keys).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -379,7 +436,7 @@ public class SrpConnection implements RedisConnection { client.discard(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -393,7 +450,7 @@ public class SrpConnection implements RedisConnection { } return Collections.singletonList((Object) exec); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -401,12 +458,12 @@ public class SrpConnection implements RedisConnection { public Boolean exists(byte[] key) { try { if (isPipelined()) { - pipeline.exists(key); + pipeline(pipeline.exists(key)); return null; } return client.exists(key).data() == 1; } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -414,12 +471,12 @@ public class SrpConnection implements RedisConnection { public Boolean expire(byte[] key, long seconds) { try { if (isPipelined()) { - pipeline.expire(key, seconds); + pipeline(pipeline.expire(key, seconds)); return null; } return client.expire(key, seconds).data() == 1; } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -427,12 +484,12 @@ public class SrpConnection implements RedisConnection { public Boolean expireAt(byte[] key, long unixTime) { try { if (isPipelined()) { - pipeline.expireat(key, unixTime); + pipeline(pipeline.expireat(key, unixTime)); return null; } return client.expireat(key, unixTime).data() == 1; } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -440,12 +497,12 @@ public class SrpConnection implements RedisConnection { public Set keys(byte[] pattern) { try { if (isPipelined()) { - pipeline.keys(pattern); + pipeline(pipeline.keys(pattern)); return null; } return SrpUtils.toSet(client.keys(pattern).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -463,7 +520,7 @@ public class SrpConnection implements RedisConnection { } client.multi(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -471,12 +528,12 @@ public class SrpConnection implements RedisConnection { public Boolean persist(byte[] key) { try { if (isPipelined()) { - pipeline.persist(key); + pipeline(pipeline.persist(key)); return null; } return client.persist(key).data() == 1; } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -484,12 +541,12 @@ public class SrpConnection implements RedisConnection { public Boolean move(byte[] key, int dbIndex) { try { if (isPipelined()) { - pipeline.move(key, dbIndex); + pipeline(pipeline.move(key, dbIndex)); return null; } return client.move(key, dbIndex).data() == 1; } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -497,12 +554,12 @@ public class SrpConnection implements RedisConnection { public byte[] randomKey() { try { if (isPipelined()) { - pipeline.randomkey(); + pipeline(pipeline.randomkey()); return null; } return client.randomkey().data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -510,12 +567,12 @@ public class SrpConnection implements RedisConnection { public void rename(byte[] oldName, byte[] newName) { try { if (isPipelined()) { - pipeline.rename(oldName, newName); + pipeline(pipeline.rename(oldName, newName)); return; } client.rename(oldName, newName); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -523,12 +580,12 @@ public class SrpConnection implements RedisConnection { public Boolean renameNX(byte[] oldName, byte[] newName) { try { if (isPipelined()) { - pipeline.renamenx(oldName, newName); + pipeline(pipeline.renamenx(oldName, newName)); return null; } return (client.renamenx(oldName, newName).data() == 1); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -540,7 +597,7 @@ public class SrpConnection implements RedisConnection { } client.select(dbIndex); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -548,12 +605,12 @@ public class SrpConnection implements RedisConnection { public Long ttl(byte[] key) { try { if (isPipelined()) { - pipeline.ttl(key); + pipeline(pipeline.ttl(key)); return null; } return client.ttl(key).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -561,12 +618,12 @@ public class SrpConnection implements RedisConnection { public DataType type(byte[] key) { try { if (isPipelined()) { - pipeline.type(key); + pipeline(pipeline.type(key)); return null; } return DataType.fromCode(client.type(key).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -575,7 +632,7 @@ public class SrpConnection implements RedisConnection { try { client.unwatch(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -586,13 +643,13 @@ public class SrpConnection implements RedisConnection { } try { if (isPipelined()) { - pipeline.watch((Object[]) keys); + pipeline(pipeline.watch((Object[]) keys)); } else { client.watch((Object[]) keys); } } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -604,13 +661,13 @@ public class SrpConnection implements RedisConnection { public byte[] get(byte[] key) { try { if (isPipelined()) { - pipeline.get(key); + pipeline(pipeline.get(key)); return null; } return client.get(key).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -618,12 +675,12 @@ public class SrpConnection implements RedisConnection { public void set(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.set(key, value); + pipeline(pipeline.set(key, value)); return; } client.set(key, value); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -632,12 +689,12 @@ public class SrpConnection implements RedisConnection { public byte[] getSet(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.getset(key, value); + pipeline(pipeline.getset(key, value)); return null; } return client.getset(key, value).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -645,12 +702,12 @@ public class SrpConnection implements RedisConnection { public Long append(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.append(key, value); + pipeline(pipeline.append(key, value)); return null; } return client.append(key, value).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -658,12 +715,12 @@ public class SrpConnection implements RedisConnection { public List mGet(byte[]... keys) { try { if (isPipelined()) { - pipeline.mget((Object[]) keys); + pipeline(pipeline.mget((Object[]) keys)); return null; } return SrpUtils.toBytesList(client.mget((Object[]) keys).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -671,12 +728,12 @@ public class SrpConnection implements RedisConnection { public void mSet(Map tuples) { try { if (isPipelined()) { - pipeline.mset((Object[]) SrpUtils.convert(tuples)); + pipeline(pipeline.mset((Object[]) SrpUtils.convert(tuples))); return; } client.mset((Object[]) SrpUtils.convert(tuples)); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -684,12 +741,12 @@ public class SrpConnection implements RedisConnection { public void mSetNX(Map tuples) { try { if (isPipelined()) { - pipeline.msetnx((Object[]) SrpUtils.convert(tuples)); + pipeline(pipeline.msetnx((Object[]) SrpUtils.convert(tuples))); return; } client.msetnx((Object[]) SrpUtils.convert(tuples)); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -697,12 +754,12 @@ public class SrpConnection implements RedisConnection { public void setEx(byte[] key, long time, byte[] value) { try { if (isPipelined()) { - pipeline.setex(key, time, value); + pipeline(pipeline.setex(key, time, value)); return; } client.setex(key, time, value); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -710,12 +767,12 @@ public class SrpConnection implements RedisConnection { public Boolean setNX(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.setnx(key, value); + pipeline(pipeline.setnx(key, value)); return null; } return client.setnx(key, value).data() == 1; } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -723,12 +780,12 @@ public class SrpConnection implements RedisConnection { public byte[] getRange(byte[] key, long start, long end) { try { if (isPipelined()) { - pipeline.getrange(key, start, end); + pipeline(pipeline.getrange(key, start, end)); return null; } return client.getrange(key, start, end).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -736,12 +793,12 @@ public class SrpConnection implements RedisConnection { public Long decr(byte[] key) { try { if (isPipelined()) { - pipeline.decr(key); + pipeline(pipeline.decr(key)); return null; } return client.decr(key).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -749,12 +806,12 @@ public class SrpConnection implements RedisConnection { public Long decrBy(byte[] key, long value) { try { if (isPipelined()) { - pipeline.decrby(key, value); + pipeline(pipeline.decrby(key, value)); return null; } return client.decrby(key, value).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -762,12 +819,12 @@ public class SrpConnection implements RedisConnection { public Long incr(byte[] key) { try { if (isPipelined()) { - pipeline.incr(key); + pipeline(pipeline.incr(key)); return null; } return client.incr(key).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -775,12 +832,12 @@ public class SrpConnection implements RedisConnection { public Long incrBy(byte[] key, long value) { try { if (isPipelined()) { - pipeline.incrby(key, value); + pipeline(pipeline.incrby(key, value)); return null; } return client.incrby(key, value).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -795,7 +852,7 @@ public class SrpConnection implements RedisConnection { } return (client.getbit(key, offset).data() == 1); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -810,7 +867,7 @@ public class SrpConnection implements RedisConnection { } client.setbit(key, offset, SrpUtils.asBit(value)); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -825,7 +882,7 @@ public class SrpConnection implements RedisConnection { } client.setrange(key, start, value); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -833,12 +890,12 @@ public class SrpConnection implements RedisConnection { public Long strLen(byte[] key) { try { if (isPipelined()) { - pipeline.strlen(key); + pipeline(pipeline.strlen(key)); return null; } return client.strlen(key).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -850,12 +907,12 @@ public class SrpConnection implements RedisConnection { public Long lPush(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.lpush(key, new Object[] { value }); + pipeline(pipeline.lpush(key, new Object[] { value })); return null; } return client.lpush(key, new Object[] { value }).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -863,12 +920,12 @@ public class SrpConnection implements RedisConnection { public Long rPush(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.rpush(key, new Object[] { value }); + pipeline(pipeline.rpush(key, new Object[] { value })); return null; } return client.rpush(key, new Object[] { value }).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -876,13 +933,13 @@ public class SrpConnection implements RedisConnection { public List bLPop(int timeout, byte[]... keys) { try { if (isPipelined()) { - // pipeline.blpop(timeout, keys); + // pipeline(pipeline.blpop(timeout, keys)); return null; } // return SrpUtils.toBytesList(client.blpop(timeout, keys).data()); throw new UnsupportedOperationException(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -890,13 +947,13 @@ public class SrpConnection implements RedisConnection { public List bRPop(int timeout, byte[]... keys) { try { if (isPipelined()) { - // pipeline.brpop(timeout, keys); + // pipeline(pipeline.brpop(timeout, keys)); return null; } // return SrpUtils.toBytesList(client.brpop(timeout, keys).data()); throw new UnsupportedOperationException(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -904,12 +961,12 @@ public class SrpConnection implements RedisConnection { public byte[] lIndex(byte[] key, long index) { try { if (isPipelined()) { - pipeline.lindex(key, index); + pipeline(pipeline.lindex(key, index)); return null; } return client.lindex(key, index).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -917,12 +974,12 @@ public class SrpConnection implements RedisConnection { public Long lInsert(byte[] key, Position where, byte[] pivot, byte[] value) { try { if (isPipelined()) { - pipeline.linsert(key, SrpUtils.convertPosition(where), pivot, value); + pipeline(pipeline.linsert(key, SrpUtils.convertPosition(where), pivot, value)); return null; } return client.linsert(key, SrpUtils.convertPosition(where), pivot, value).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -930,12 +987,12 @@ public class SrpConnection implements RedisConnection { public Long lLen(byte[] key) { try { if (isPipelined()) { - pipeline.llen(key); + pipeline(pipeline.llen(key)); return null; } return client.llen(key).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -943,12 +1000,12 @@ public class SrpConnection implements RedisConnection { public byte[] lPop(byte[] key) { try { if (isPipelined()) { - pipeline.lpop(key); + pipeline(pipeline.lpop(key)); return null; } return client.lpop(key).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -956,12 +1013,12 @@ public class SrpConnection implements RedisConnection { public List lRange(byte[] key, long start, long end) { try { if (isPipelined()) { - pipeline.lrange(key, start, end); + pipeline(pipeline.lrange(key, start, end)); return null; } return SrpUtils.toBytesList(client.lrange(key, start, end).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -969,12 +1026,12 @@ public class SrpConnection implements RedisConnection { public Long lRem(byte[] key, long count, byte[] value) { try { if (isPipelined()) { - pipeline.lrem(key, count, value); + pipeline(pipeline.lrem(key, count, value)); return null; } return client.lrem(key, count, value).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -982,12 +1039,12 @@ public class SrpConnection implements RedisConnection { public void lSet(byte[] key, long index, byte[] value) { try { if (isPipelined()) { - pipeline.lset(key, index, value); + pipeline(pipeline.lset(key, index, value)); return; } client.lset(key, index, value); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -995,12 +1052,12 @@ public class SrpConnection implements RedisConnection { public void lTrim(byte[] key, long start, long end) { try { if (isPipelined()) { - pipeline.ltrim(key, start, end); + pipeline(pipeline.ltrim(key, start, end)); return; } client.ltrim(key, start, end); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1008,12 +1065,12 @@ public class SrpConnection implements RedisConnection { public byte[] rPop(byte[] key) { try { if (isPipelined()) { - pipeline.rpop(key); + pipeline(pipeline.rpop(key)); return null; } return client.rpop(key).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1021,12 +1078,12 @@ public class SrpConnection implements RedisConnection { public byte[] rPopLPush(byte[] srcKey, byte[] dstKey) { try { if (isPipelined()) { - pipeline.rpoplpush(srcKey, dstKey); + pipeline(pipeline.rpoplpush(srcKey, dstKey)); return null; } return client.rpoplpush(srcKey, dstKey).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1034,12 +1091,12 @@ public class SrpConnection implements RedisConnection { public byte[] bRPopLPush(int timeout, byte[] srcKey, byte[] dstKey) { try { if (isPipelined()) { - pipeline.brpoplpush(srcKey, dstKey, timeout); + pipeline(pipeline.brpoplpush(srcKey, dstKey, timeout)); return null; } return client.brpoplpush(srcKey, dstKey, timeout).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1047,12 +1104,12 @@ public class SrpConnection implements RedisConnection { public Long lPushX(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.lpushx(key, value); + pipeline(pipeline.lpushx(key, value)); return null; } return client.lpushx(key, value).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1060,12 +1117,12 @@ public class SrpConnection implements RedisConnection { public Long rPushX(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.rpushx(key, value); + pipeline(pipeline.rpushx(key, value)); return null; } return client.rpushx(key, value).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1078,12 +1135,12 @@ public class SrpConnection implements RedisConnection { public Boolean sAdd(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.sadd(key, new Object[] { value }); + pipeline(pipeline.sadd(key, new Object[] { value })); return null; } return (client.sadd(key, new Object[] { value }).data() == 1); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1091,12 +1148,12 @@ public class SrpConnection implements RedisConnection { public Long sCard(byte[] key) { try { if (isPipelined()) { - pipeline.scard(key); + pipeline(pipeline.scard(key)); return null; } return client.scard(key).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1104,12 +1161,12 @@ public class SrpConnection implements RedisConnection { public Set sDiff(byte[]... keys) { try { if (isPipelined()) { - pipeline.sdiff((Object[]) keys); + pipeline(pipeline.sdiff((Object[]) keys)); return null; } return SrpUtils.toSet(client.sdiff((Object[]) keys).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1117,12 +1174,12 @@ public class SrpConnection implements RedisConnection { public Long sDiffStore(byte[] destKey, byte[]... keys) { try { if (isPipelined()) { - pipeline.sdiffstore(destKey, (Object[]) keys); + pipeline(pipeline.sdiffstore(destKey, (Object[]) keys)); return null; } return client.sdiffstore(destKey, (Object[]) keys).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1130,12 +1187,12 @@ public class SrpConnection implements RedisConnection { public Set sInter(byte[]... keys) { try { if (isPipelined()) { - pipeline.sinter((Object[]) keys); + pipeline(pipeline.sinter((Object[]) keys)); return null; } return SrpUtils.toSet(client.sinter((Object[]) keys).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1143,12 +1200,12 @@ public class SrpConnection implements RedisConnection { public Long sInterStore(byte[] destKey, byte[]... keys) { try { if (isPipelined()) { - pipeline.sinterstore(destKey, (Object[]) keys); + pipeline(pipeline.sinterstore(destKey, (Object[]) keys)); return null; } return client.sinterstore(destKey, (Object[]) keys).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1156,12 +1213,12 @@ public class SrpConnection implements RedisConnection { public Boolean sIsMember(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.sismember(key, value); + pipeline(pipeline.sismember(key, value)); return null; } return client.sismember(key, value).data() == 1; } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1169,12 +1226,12 @@ public class SrpConnection implements RedisConnection { public Set sMembers(byte[] key) { try { if (isPipelined()) { - pipeline.smembers(key); + pipeline(pipeline.smembers(key)); return null; } return SrpUtils.toSet(client.smembers(key).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1182,12 +1239,12 @@ public class SrpConnection implements RedisConnection { public Boolean sMove(byte[] srcKey, byte[] destKey, byte[] value) { try { if (isPipelined()) { - pipeline.smove(srcKey, destKey, value); + pipeline(pipeline.smove(srcKey, destKey, value)); return null; } return client.smove(srcKey, destKey, value).data() == 1; } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1195,12 +1252,12 @@ public class SrpConnection implements RedisConnection { public byte[] sPop(byte[] key) { try { if (isPipelined()) { - pipeline.spop(key); + pipeline(pipeline.spop(key)); return null; } return client.spop(key).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1208,12 +1265,12 @@ public class SrpConnection implements RedisConnection { public byte[] sRandMember(byte[] key) { try { if (isPipelined()) { - pipeline.srandmember(key); + pipeline(pipeline.srandmember(key)); return null; } return client.srandmember(key).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1221,12 +1278,12 @@ public class SrpConnection implements RedisConnection { public Boolean sRem(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.srem(key, new Object[] { value }); + pipeline(pipeline.srem(key, new Object[] { value })); return null; } return client.srem(key, new Object[] { value }).data() == 1; } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1234,12 +1291,12 @@ public class SrpConnection implements RedisConnection { public Set sUnion(byte[]... keys) { try { if (isPipelined()) { - pipeline.sunion((Object[]) keys); + pipeline(pipeline.sunion((Object[]) keys)); return null; } return SrpUtils.toSet(client.sunion((Object[]) keys).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1247,12 +1304,12 @@ public class SrpConnection implements RedisConnection { public Long sUnionStore(byte[] destKey, byte[]... keys) { try { if (isPipelined()) { - pipeline.sunionstore(destKey, (Object[]) keys); + pipeline(pipeline.sunionstore(destKey, (Object[]) keys)); return null; } return client.sunionstore(destKey, (Object[]) keys).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1264,12 +1321,12 @@ public class SrpConnection implements RedisConnection { public Boolean zAdd(byte[] key, double score, byte[] value) { try { if (isPipelined()) { - pipeline.zadd(new Object[] { key, score, value }); + pipeline(pipeline.zadd(new Object[] { key, score, value })); return null; } return client.zadd(new Object[] { key, score, value }).data() == 1; } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1277,12 +1334,12 @@ public class SrpConnection implements RedisConnection { public Long zCard(byte[] key) { try { if (isPipelined()) { - pipeline.zcard(key); + pipeline(pipeline.zcard(key)); return null; } return client.zcard(key).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1290,12 +1347,12 @@ public class SrpConnection implements RedisConnection { public Long zCount(byte[] key, double min, double max) { try { if (isQueueing()) { - pipeline.zcount(key, min, max); + pipeline(pipeline.zcount(key, min, max)); return null; } return client.zcount(key, min, max).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1303,12 +1360,12 @@ public class SrpConnection implements RedisConnection { public Double zIncrBy(byte[] key, double increment, byte[] value) { try { if (isPipelined()) { - pipeline.zincrby(key, increment, value); + pipeline(pipeline.zincrby(key, increment, value)); return null; } return SrpUtils.toDouble(client.zincrby(key, increment, value).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1331,24 +1388,24 @@ public class SrpConnection implements RedisConnection { try { if (isQueueing()) { - pipeline.zinterstore(args); + pipeline(pipeline.zinterstore(args)); return null; } return client.zinterstore(args).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } public Set zRange(byte[] key, long start, long end) { try { if (isPipelined()) { - pipeline.zrange(key, start, end, null); + pipeline(pipeline.zrange(key, start, end, null)); return null; } return SrpUtils.toSet(client.zrange(key, start, end, null).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1356,12 +1413,12 @@ public class SrpConnection implements RedisConnection { public Set zRangeWithScores(byte[] key, long start, long end) { try { if (isPipelined()) { - pipeline.zrange(key, start, end, SrpUtils.WITHSCORES); + pipeline(pipeline.zrange(key, start, end, SrpUtils.WITHSCORES)); return null; } return SrpUtils.convertTuple(client.zrange(key, start, end, SrpUtils.WITHSCORES)); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1369,12 +1426,12 @@ public class SrpConnection implements RedisConnection { public Set zRangeByScore(byte[] key, double min, double max) { try { if (isPipelined()) { - pipeline.zrangebyscore(key, min, max, null, null); + pipeline(pipeline.zrangebyscore(key, min, max, null, null)); return null; } return SrpUtils.toSet(client.zrangebyscore(key, min, max, null, null).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1382,12 +1439,12 @@ public class SrpConnection implements RedisConnection { public Set zRangeByScoreWithScores(byte[] key, double min, double max) { try { if (isPipelined()) { - pipeline.zrangebyscore(key, min, max, SrpUtils.WITHSCORES, null); + pipeline(pipeline.zrangebyscore(key, min, max, SrpUtils.WITHSCORES, null)); return null; } return SrpUtils.convertTuple(client.zrangebyscore(key, min, max, SrpUtils.WITHSCORES, null)); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1395,12 +1452,12 @@ public class SrpConnection implements RedisConnection { public Set zRevRangeWithScores(byte[] key, long start, long end) { try { if (isPipelined()) { - pipeline.zrevrange(key, start, end, SrpUtils.WITHSCORES); + pipeline(pipeline.zrevrange(key, start, end, SrpUtils.WITHSCORES)); return null; } return SrpUtils.convertTuple(client.zrevrange(key, start, end, SrpUtils.WITHSCORES)); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1409,12 +1466,12 @@ public class SrpConnection implements RedisConnection { try { byte[] limit = SrpUtils.limit(offset, count); if (isPipelined()) { - pipeline.zrangebyscore(key, min, max, null, limit); + pipeline(pipeline.zrangebyscore(key, min, max, null, limit)); return null; } return SrpUtils.toSet(client.zrangebyscore(key, min, max, null, limit).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1423,12 +1480,12 @@ public class SrpConnection implements RedisConnection { try { byte[] limit = SrpUtils.limit(offset, count); if (isPipelined()) { - pipeline.zrangebyscore(key, min, max, SrpUtils.WITHSCORES, limit); + pipeline(pipeline.zrangebyscore(key, min, max, SrpUtils.WITHSCORES, limit)); return null; } return SrpUtils.convertTuple(client.zrangebyscore(key, min, max, SrpUtils.WITHSCORES, limit)); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1441,7 +1498,7 @@ public class SrpConnection implements RedisConnection { } return SrpUtils.toSet(client.zrevrangebyscore(key, min, max, null, limit).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1453,7 +1510,7 @@ public class SrpConnection implements RedisConnection { } return SrpUtils.toSet(client.zrevrangebyscore(key, min, max, null, null).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1466,7 +1523,7 @@ public class SrpConnection implements RedisConnection { } return SrpUtils.convertTuple(client.zrevrangebyscore(key, min, max, SrpUtils.WITHSCORES, limit)); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1478,7 +1535,7 @@ public class SrpConnection implements RedisConnection { } return SrpUtils.convertTuple(client.zrevrangebyscore(key, min, max, SrpUtils.WITHSCORES, null)); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1492,7 +1549,7 @@ public class SrpConnection implements RedisConnection { } return (Long) client.zrank(key, value).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1500,12 +1557,12 @@ public class SrpConnection implements RedisConnection { public Boolean zRem(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.zrem(key, new Object[] { value }); + pipeline(pipeline.zrem(key, new Object[] { value })); return null; } return client.zrem(key, new Object[] { value }).data() == 1; } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1513,12 +1570,12 @@ public class SrpConnection implements RedisConnection { public Long zRemRange(byte[] key, long start, long end) { try { if (isPipelined()) { - pipeline.zremrangebyrank(key, start, end); + pipeline(pipeline.zremrangebyrank(key, start, end)); return null; } return client.zremrangebyrank(key, start, end).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1526,12 +1583,12 @@ public class SrpConnection implements RedisConnection { public Long zRemRangeByScore(byte[] key, double min, double max) { try { if (isPipelined()) { - pipeline.zremrangebyscore(key, min, max); + pipeline(pipeline.zremrangebyscore(key, min, max)); return null; } return client.zremrangebyscore(key, min, max).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1539,12 +1596,12 @@ public class SrpConnection implements RedisConnection { public Set zRevRange(byte[] key, long start, long end) { try { if (isPipelined()) { - pipeline.zrevrange(key, start, end, null); + pipeline(pipeline.zrevrange(key, start, end, null)); return null; } return SrpUtils.toSet(client.zrevrange(key, start, end, null).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1552,12 +1609,12 @@ public class SrpConnection implements RedisConnection { public Long zRevRank(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.zrevrank(key, value); + pipeline(pipeline.zrevrank(key, value)); return null; } return (Long) client.zrevrank(key, value).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1565,12 +1622,12 @@ public class SrpConnection implements RedisConnection { public Double zScore(byte[] key, byte[] value) { try { if (isPipelined()) { - pipeline.zscore(key, value); + pipeline(pipeline.zscore(key, value)); return null; } return SrpUtils.toDouble(client.zscore(key, value).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1583,12 +1640,12 @@ public class SrpConnection implements RedisConnection { public Long zUnionStore(byte[] destKey, byte[]... sets) { try { if (isPipelined()) { - pipeline.zunionstore(destKey, sets.length, (Object[]) sets); + pipeline(pipeline.zunionstore(destKey, sets.length, (Object[]) sets)); return null; } return client.zunionstore(destKey, sets.length, (Object[]) sets).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1600,12 +1657,12 @@ public class SrpConnection implements RedisConnection { public Boolean hSet(byte[] key, byte[] field, byte[] value) { try { if (isPipelined()) { - pipeline.hset(key, field, value); + pipeline(pipeline.hset(key, field, value)); return null; } return client.hset(key, field, value).data() == 1; } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1613,12 +1670,12 @@ public class SrpConnection implements RedisConnection { public Boolean hSetNX(byte[] key, byte[] field, byte[] value) { try { if (isPipelined()) { - pipeline.hsetnx(key, field, value); + pipeline(pipeline.hsetnx(key, field, value)); return null; } return client.hsetnx(key, field, value).data() == 1; } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1626,12 +1683,12 @@ public class SrpConnection implements RedisConnection { public Boolean hDel(byte[] key, byte[] field) { try { if (isPipelined()) { - pipeline.hdel(key, new Object[] { field }); + pipeline(pipeline.hdel(key, new Object[] { field })); return null; } return client.hdel(key, new Object[] { field }).data() == 1; } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1639,12 +1696,12 @@ public class SrpConnection implements RedisConnection { public Boolean hExists(byte[] key, byte[] field) { try { if (isPipelined()) { - pipeline.hexists(key, field); + pipeline(pipeline.hexists(key, field)); return null; } return client.hexists(key, field).data() == 1; } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1652,12 +1709,12 @@ public class SrpConnection implements RedisConnection { public byte[] hGet(byte[] key, byte[] field) { try { if (isPipelined()) { - pipeline.hget(key, field); + pipeline(pipeline.hget(key, field)); return null; } return client.hget(key, field).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1665,12 +1722,12 @@ public class SrpConnection implements RedisConnection { public Map hGetAll(byte[] key) { try { if (isPipelined()) { - pipeline.hgetall(key); + pipeline(pipeline.hgetall(key)); return null; } return SrpUtils.toMap(client.hgetall(key).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1678,12 +1735,12 @@ public class SrpConnection implements RedisConnection { public Long hIncrBy(byte[] key, byte[] field, long delta) { try { if (isPipelined()) { - pipeline.hincrby(key, field, delta); + pipeline(pipeline.hincrby(key, field, delta)); return null; } return client.hincrby(key, field, delta).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1691,12 +1748,12 @@ public class SrpConnection implements RedisConnection { public Set hKeys(byte[] key) { try { if (isPipelined()) { - pipeline.hkeys(key); + pipeline(pipeline.hkeys(key)); return null; } return SrpUtils.toSet(client.hkeys(key).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1704,12 +1761,12 @@ public class SrpConnection implements RedisConnection { public Long hLen(byte[] key) { try { if (isPipelined()) { - pipeline.hlen(key); + pipeline(pipeline.hlen(key)); return null; } return client.hlen(key).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1717,12 +1774,12 @@ public class SrpConnection implements RedisConnection { public List hMGet(byte[] key, byte[]... fields) { try { if (isPipelined()) { - pipeline.hmget(key, (Object[]) fields); + pipeline(pipeline.hmget(key, (Object[]) fields)); return null; } return SrpUtils.toBytesList(client.hmget(key, (Object[]) fields).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1730,12 +1787,12 @@ public class SrpConnection implements RedisConnection { public void hMSet(byte[] key, Map tuple) { try { if (isPipelined()) { - pipeline.hmset(key, SrpUtils.convert(tuple)); + pipeline(pipeline.hmset(key, SrpUtils.convert(tuple))); return; } client.hmset(key, SrpUtils.convert(tuple)); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1743,12 +1800,12 @@ public class SrpConnection implements RedisConnection { public List hVals(byte[] key) { try { if (isPipelined()) { - pipeline.hvals(key); + pipeline(pipeline.hvals(key)); return null; } return SrpUtils.toBytesList(client.hvals(key).data()); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1763,12 +1820,12 @@ public class SrpConnection implements RedisConnection { throw new UnsupportedOperationException(); } if (isPipelined()) { - pipeline.publish(channel, message); + pipeline(pipeline.publish(channel, message)); return null; } return client.publish(channel, message).data(); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1797,7 +1854,7 @@ public class SrpConnection implements RedisConnection { subscription = new SrpSubscription(listener, client); subscription.pSubscribe(patterns); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1814,7 +1871,7 @@ public class SrpConnection implements RedisConnection { subscription.subscribe(channels); } catch (Exception ex) { - throw convertSRAccessException(ex); + throw convertSrpAccessException(ex); } } @@ -1824,4 +1881,9 @@ public class SrpConnection implements RedisConnection { "Connection already subscribed; use the connection Subscription to cancel or add new channels"); } } + + // processing method that adds a listener to the future in order to track down the results and close the pipeline + private void pipeline(ListenableFuture future) { + callback.addCommand(future); + } } \ No newline at end of file diff --git a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java index 94badf377..11dfa2185 100644 --- a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java @@ -209,7 +209,7 @@ public abstract class AbstractConnectionIntegrationTests { final BlockingDeque queue = new LinkedBlockingDeque(); final MessageListener ml = new MessageListener() { - + public void onMessage(Message message, byte[] pattern) { queue.add(message); System.out.println("received message"); @@ -225,7 +225,7 @@ public abstract class AbstractConnectionIntegrationTests { final AtomicBoolean flag = new AtomicBoolean(true); Runnable listener = new Runnable() { - + public void run() { subConn.subscribe(ml, channel); System.out.println("Subscribed"); @@ -264,7 +264,7 @@ public abstract class AbstractConnectionIntegrationTests { MessageListener listener = new MessageListener() { - + public void onMessage(Message message, byte[] pattern) { assertArrayEquals(expectedChannel, message.getChannel()); assertArrayEquals(expectedMessage, message.getBody()); @@ -272,7 +272,7 @@ public abstract class AbstractConnectionIntegrationTests { }; Thread th = new Thread(new Runnable() { - + public void run() { // sleep 1 second to let the registration happen try { @@ -301,7 +301,7 @@ public abstract class AbstractConnectionIntegrationTests { MessageListener listener = new MessageListener() { - + public void onMessage(Message message, byte[] pattern) { assertArrayEquals(expectedPattern, pattern); assertArrayEquals(expectedMessage, message.getBody()); @@ -310,7 +310,7 @@ public abstract class AbstractConnectionIntegrationTests { }; Thread th = new Thread(new Runnable() { - + public void run() { // sleep 1 second to let the registration happen try { @@ -340,4 +340,36 @@ public abstract class AbstractConnectionIntegrationTests { connection.execute("iNFo"); connection.execute("SET ", getClass() + "testSetNative", UUID.randomUUID().toString()); } + + @Test(expected = DataAccessException.class) + public void exceptionExecuteNative() throws Exception { + connection.execute("ZadD", getClass() + "#foo\t0.90\titem"); + } + + @Test(expected = RedisPipelineException.class) + public void testExceptionExecuteNativeWithPipeline() throws Exception { + connection.openPipeline(); + connection.execute("iNFo"); + connection.execute("SET ", getClass() + "testSetNative", UUID.randomUUID().toString()); + connection.execute("ZadD", getClass() + "#foo\t0.90\titem"); + connection.closePipeline(); + } + + @Test + public void testExecuteNativeWithPipeline() throws Exception { + String key1 = getClass() + "#ExecuteNativeWithPipeline#1"; + String value1 = UUID.randomUUID().toString(); + String key2 = getClass() + "#ExecuteNativeWithPipeline#2"; + String value2 = UUID.randomUUID().toString(); + + connection.openPipeline(); + connection.execute("SET", key1, value1); + connection.execute("SET", key2, value2); + connection.execute("GET", key1); + connection.execute("GET", key2); + List result = connection.closePipeline(); + assertEquals(4, result.size()); + assertArrayEquals(value1.getBytes(), (byte[]) result.get(2)); + assertArrayEquals(value2.getBytes(), (byte[]) result.get(3)); + } } \ No newline at end of file diff --git a/src/test/java/org/springframework/data/redis/connection/jredis/JRedisConnectionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/jredis/JRedisConnectionIntegrationTests.java index c38a6134c..8d92a6397 100644 --- a/src/test/java/org/springframework/data/redis/connection/jredis/JRedisConnectionIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/jredis/JRedisConnectionIntegrationTests.java @@ -91,4 +91,8 @@ public class JRedisConnectionIntegrationTests extends AbstractConnectionIntegrat public void testBitSet() throws Exception { } + @Ignore + public void exceptionExecuteNativeWithPipeline() throws Exception { + } + } \ No newline at end of file