add pipeline exception for Rjc

This commit is contained in:
Costin Leau
2012-06-25 19:49:20 +03:00
parent 64c527c088
commit 7a89656cf0
4 changed files with 39 additions and 12 deletions

View File

@@ -93,7 +93,8 @@ public interface RedisConnection extends RedisCommands {
* Executes the commands in the pipeline and returns their result.
* If the connection is not pipelined, an empty collection is returned.
*
* @throws RedisPipelineException if the pipeline contains any incorrect/invalid statements
* @return the result of the executed commands.
*/
List<Object> closePipeline();
List<Object> closePipeline() throws RedisPipelineException;
}

View File

@@ -23,7 +23,7 @@ import org.springframework.dao.InvalidDataAccessResourceUsageException;
/**
* Exception thrown when executing/closing a pipeline that contains one or multiple invalid/incorrect statements.
* The exception contains the pipeline result, allowing for analysis and tracing.
* The exception might also contain the pipeline result (if the driver returns it), allowing for analysis and tracing.
* <p/>
* Typically, the first exception returned by the pipeline is used as the <i>cause</i> of this exception for easier
* debugging.
@@ -56,6 +56,15 @@ public class RedisPipelineException extends InvalidDataAccessResourceUsageExcept
this("Pipeline contained one or more invalid commands", cause, pipelineResult);
}
/**
* Constructs a new <code>RedisPipelineException</code> instance using a default message
* and an empty pipeline result list.
*
* @param cause the cause
*/
public RedisPipelineException(Exception cause) {
this("Pipeline contained one or more invalid commands", cause, Collections.emptyList());
}
/**
* Constructs a new <code>RedisPipelineException</code> instance.
@@ -69,7 +78,7 @@ public class RedisPipelineException extends InvalidDataAccessResourceUsageExcept
}
/**
* Returns the result of the pipeline that caused the exception.
* Optionally returns the result of the pipeline that caused the exception.
* Typically contains both the results of the successful statements but also
* the exceptions of the incorrect ones.
*

View File

@@ -34,6 +34,7 @@ import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisPipelineException;
import org.springframework.data.redis.connection.RedisSubscribedConnectionException;
import org.springframework.data.redis.connection.SortParameters;
import org.springframework.data.redis.connection.Subscription;
@@ -142,9 +143,10 @@ public class RjcConnection implements RedisConnection {
@SuppressWarnings("unchecked")
public List<Object> closePipeline() {
if (pipeline != null) {
List execute = client.getAll();
if (execute != null && !execute.isEmpty()) {
return execute;
try {
List execute = client.getAll();
} catch (Exception ex) {
throw new RedisPipelineException(convertRjcAccessException(ex));
}
}
return Collections.emptyList();

View File

@@ -209,7 +209,7 @@ public abstract class AbstractConnectionIntegrationTests {
final BlockingDeque<Message> queue = new LinkedBlockingDeque<Message>();
final MessageListener ml = new MessageListener() {
public void onMessage(Message message, byte[] pattern) {
queue.add(message);
System.out.println("received message");
@@ -225,7 +225,7 @@ public abstract class AbstractConnectionIntegrationTests {
final AtomicBoolean flag = new AtomicBoolean(true);
Runnable listener = new Runnable() {
public void run() {
subConn.subscribe(ml, channel);
System.out.println("Subscribed");
@@ -264,7 +264,7 @@ public abstract class AbstractConnectionIntegrationTests {
MessageListener listener = new MessageListener() {
public void onMessage(Message message, byte[] pattern) {
assertArrayEquals(expectedChannel, message.getChannel());
assertArrayEquals(expectedMessage, message.getBody());
@@ -272,7 +272,7 @@ public abstract class AbstractConnectionIntegrationTests {
};
Thread th = new Thread(new Runnable() {
public void run() {
// sleep 1 second to let the registration happen
try {
@@ -301,7 +301,7 @@ public abstract class AbstractConnectionIntegrationTests {
MessageListener listener = new MessageListener() {
public void onMessage(Message message, byte[] pattern) {
assertArrayEquals(expectedPattern, pattern);
assertArrayEquals(expectedMessage, message.getBody());
@@ -310,7 +310,7 @@ public abstract class AbstractConnectionIntegrationTests {
};
Thread th = new Thread(new Runnable() {
public void run() {
// sleep 1 second to let the registration happen
try {
@@ -340,4 +340,19 @@ public abstract class AbstractConnectionIntegrationTests {
connection.execute("iNFo");
connection.execute("SET ", getClass() + "testSetNative", UUID.randomUUID().toString());
}
@Test(expected = DataAccessException.class)
public void exceptionExecuteNative() throws Exception {
connection.execute("ZadD", getClass() + "#foo\t0.90\titem");
}
@Test(expected = DataAccessException.class)
public void exceptionExecuteNativeWithPipeline() throws Exception {
connection.openPipeline();
connection.execute("iNFo");
connection.execute("SET ", getClass() + "testSetNative", UUID.randomUUID().toString());
connection.execute("ZadD", getClass() + "#foo\t0.90\titem");
connection.closePipeline();
}
}