Merge branch 'pipeline-exception'

This commit is contained in:
Costin Leau
2012-06-25 21:21:26 +03:00
8 changed files with 520 additions and 297 deletions

View File

@@ -93,7 +93,8 @@ public interface RedisConnection extends RedisCommands {
* Executes the commands in the pipeline and returns their result.
* If the connection is not pipelined, an empty collection is returned.
*
* @throws RedisPipelineException if the pipeline contains any incorrect/invalid statements
* @return the result of the executed commands.
*/
List<Object> closePipeline();
List<Object> closePipeline() throws RedisPipelineException;
}

View File

@@ -0,0 +1,90 @@
/*
* Copyright 2011-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.connection;
import java.util.Collections;
import java.util.List;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
/**
* Exception thrown when executing/closing a pipeline that contains one or multiple invalid/incorrect statements.
* The exception might also contain the pipeline result (if the driver returns it), allowing for analysis and tracing.
* <p/>
* Typically, the first exception returned by the pipeline is used as the <i>cause</i> of this exception for easier
* debugging.
*
* @author Costin Leau
*/
public class RedisPipelineException extends InvalidDataAccessResourceUsageException {
private final List<Object> results;
/**
* Constructs a new <code>RedisPipelineException</code> instance.
*
* @param msg the message
* @param cause the cause
* @param pipelineResult the pipeline result
*/
public RedisPipelineException(String msg, Throwable cause, List<Object> pipelineResult) {
super(msg, cause);
results = Collections.unmodifiableList(pipelineResult);
}
/**
* Constructs a new <code>RedisPipelineException</code> instance using a default message.
*
* @param cause the cause
* @param pipelineResult the pipeline result
*/
public RedisPipelineException(Exception cause, List<Object> pipelineResult) {
this("Pipeline contained one or more invalid commands", cause, pipelineResult);
}
/**
* Constructs a new <code>RedisPipelineException</code> instance using a default message
* and an empty pipeline result list.
*
* @param cause the cause
*/
public RedisPipelineException(Exception cause) {
this("Pipeline contained one or more invalid commands", cause, Collections.emptyList());
}
/**
* Constructs a new <code>RedisPipelineException</code> instance.
*
* @param msg message
* @param pipelineResult pipeline partial results
*/
public RedisPipelineException(String msg, List<Object> pipelineResult) {
super(msg);
results = Collections.unmodifiableList(pipelineResult);
}
/**
* Optionally returns the result of the pipeline that caused the exception.
* Typically contains both the results of the successful statements but also
* the exceptions of the incorrect ones.
*
* @return result of the pipeline
*/
public List<Object> getPipelineResult() {
return results;
}
}

View File

@@ -30,6 +30,7 @@ import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisPipelineException;
import org.springframework.data.redis.connection.RedisSubscribedConnectionException;
import org.springframework.data.redis.connection.SortParameters;
import org.springframework.data.redis.connection.Subscription;
@@ -68,8 +69,8 @@ public class JedisConnection implements RedisConnection {
static {
CLIENT_FIELD = ReflectionUtils.findField(BinaryJedis.class, "client", Client.class);
ReflectionUtils.makeAccessible(CLIENT_FIELD);
SEND_COMMAND = ReflectionUtils.findMethod(Connection.class, "sendCommand",
new Class[] { Command.class, byte[][].class });
SEND_COMMAND = ReflectionUtils.findMethod(Connection.class, "sendCommand", new Class[] { Command.class,
byte[][].class });
ReflectionUtils.makeAccessible(SEND_COMMAND);
GET_RESPONSE = ReflectionUtils.findMethod(Queable.class, "getResponse", Builder.class);
ReflectionUtils.makeAccessible(GET_RESPONSE);
@@ -135,27 +136,33 @@ public class JedisConnection implements RedisConnection {
public Object execute(String command, byte[]... args) {
Assert.hasText(command, "a valid command needs to be specified");
List<byte[]> mArgs = new ArrayList<byte[]>();
if (!ObjectUtils.isEmpty(args)) {
Collections.addAll(mArgs, args);
try {
List<byte[]> mArgs = new ArrayList<byte[]>();
if (!ObjectUtils.isEmpty(args)) {
Collections.addAll(mArgs, args);
}
Object result = ReflectionUtils.invokeMethod(SEND_COMMAND, client,
Command.valueOf(command.trim().toUpperCase()), mArgs.toArray(new byte[mArgs.size()][]));
if (isQueueing() || isPipelined()) {
Object target = (isPipelined() ? pipeline : transaction);
ReflectionUtils.invokeMethod(GET_RESPONSE, target, new Builder<Object>() {
public Object build(Object data) {
return data;
}
public String toString() {
return "Object";
}
});
}
else {
client.getOne();
}
return result;
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
Object result = ReflectionUtils.invokeMethod(SEND_COMMAND, client,
Command.valueOf(command.trim().toUpperCase()), mArgs.toArray(new byte[mArgs.size()][]));
if (isQueueing() || isPipelined()) {
Object target = (isPipelined() ? pipeline : transaction);
ReflectionUtils.invokeMethod(GET_RESPONSE, target, new Builder<Object>() {
public Object build(Object data) {
return data;
}
public String toString() {
return "Object";
}
});
}
return result;
}
public void close() throws DataAccessException {
@@ -230,15 +237,30 @@ public class JedisConnection implements RedisConnection {
@SuppressWarnings("unchecked")
public List<Object> closePipeline() {
if (pipeline != null) {
List execute = pipeline.syncAndReturnAll();
List<Object> execute = pipeline.syncAndReturnAll();
if (execute != null && !execute.isEmpty()) {
Exception cause = null;
for (int i = 0; i < execute.size(); i++) {
Object object = execute.get(i);
if (object instanceof Exception) {
DataAccessException dataAccessException = convertJedisAccessException((Exception) object);
if (cause == null) {
cause = dataAccessException;
}
execute.set(i, dataAccessException);
}
}
if (cause != null) {
throw new RedisPipelineException(cause, execute);
}
return execute;
}
}
return Collections.emptyList();
}
public List<byte[]> sort(byte[] key, SortParameters params) {
SortingParams sortParams = JedisUtils.convertSortParams(params);

View File

@@ -89,14 +89,17 @@ public class JredisConnection implements RedisConnection {
public Object execute(String command, byte[]... args) {
Assert.hasText(command, "a valid command needs to be specified");
List<byte[]> mArgs = new ArrayList<byte[]>();
if (!ObjectUtils.isEmpty(args)) {
Collections.addAll(mArgs, args);
try {
List<byte[]> mArgs = new ArrayList<byte[]>();
if (!ObjectUtils.isEmpty(args)) {
Collections.addAll(mArgs, args);
}
return ReflectionUtils.invokeMethod(SERVICE_REQUEST, jredis, Command.valueOf(command.trim().toUpperCase()),
mArgs.toArray(new byte[mArgs.size()][]));
} catch (Exception ex) {
throw convertJredisAccessException(ex);
}
return ReflectionUtils.invokeMethod(SERVICE_REQUEST, jredis, Command.valueOf(command.trim().toUpperCase()),
mArgs.toArray(new byte[mArgs.size()][]));
}
public void close() throws RedisSystemException {

View File

@@ -34,6 +34,7 @@ import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisPipelineException;
import org.springframework.data.redis.connection.RedisSubscribedConnectionException;
import org.springframework.data.redis.connection.SortParameters;
import org.springframework.data.redis.connection.Subscription;
@@ -83,9 +84,16 @@ public class RjcConnection implements RedisConnection {
public Object execute(String command, byte[]... args) {
Assert.hasText(command, "a valid command needs to be specified");
connection.sendCommand(Command.valueOf(command.trim().toUpperCase()),
(ObjectUtils.isEmpty(args) ? new byte[0][] : args));
return connection.getAll();
try {
connection.sendCommand(Command.valueOf(command.trim().toUpperCase()),
(ObjectUtils.isEmpty(args) ? new byte[0][] : args));
if (!isPipelined()) {
return connection.getAll();
}
return null;
} catch (Exception ex) {
throw convertRjcAccessException(ex);
}
}
public void close() throws DataAccessException {
@@ -135,9 +143,10 @@ public class RjcConnection implements RedisConnection {
@SuppressWarnings("unchecked")
public List<Object> closePipeline() {
if (pipeline != null) {
List execute = client.getAll();
if (execute != null && !execute.isEmpty()) {
return execute;
try {
List execute = client.getAll();
} catch (Exception ex) {
throw new RedisPipelineException(convertRjcAccessException(ex));
}
}
return Collections.emptyList();

View File

@@ -209,7 +209,7 @@ public abstract class AbstractConnectionIntegrationTests {
final BlockingDeque<Message> queue = new LinkedBlockingDeque<Message>();
final MessageListener ml = new MessageListener() {
public void onMessage(Message message, byte[] pattern) {
queue.add(message);
System.out.println("received message");
@@ -225,7 +225,7 @@ public abstract class AbstractConnectionIntegrationTests {
final AtomicBoolean flag = new AtomicBoolean(true);
Runnable listener = new Runnable() {
public void run() {
subConn.subscribe(ml, channel);
System.out.println("Subscribed");
@@ -264,7 +264,7 @@ public abstract class AbstractConnectionIntegrationTests {
MessageListener listener = new MessageListener() {
public void onMessage(Message message, byte[] pattern) {
assertArrayEquals(expectedChannel, message.getChannel());
assertArrayEquals(expectedMessage, message.getBody());
@@ -272,7 +272,7 @@ public abstract class AbstractConnectionIntegrationTests {
};
Thread th = new Thread(new Runnable() {
public void run() {
// sleep 1 second to let the registration happen
try {
@@ -301,7 +301,7 @@ public abstract class AbstractConnectionIntegrationTests {
MessageListener listener = new MessageListener() {
public void onMessage(Message message, byte[] pattern) {
assertArrayEquals(expectedPattern, pattern);
assertArrayEquals(expectedMessage, message.getBody());
@@ -310,7 +310,7 @@ public abstract class AbstractConnectionIntegrationTests {
};
Thread th = new Thread(new Runnable() {
public void run() {
// sleep 1 second to let the registration happen
try {
@@ -340,4 +340,36 @@ public abstract class AbstractConnectionIntegrationTests {
connection.execute("iNFo");
connection.execute("SET ", getClass() + "testSetNative", UUID.randomUUID().toString());
}
@Test(expected = DataAccessException.class)
public void exceptionExecuteNative() throws Exception {
connection.execute("ZadD", getClass() + "#foo\t0.90\titem");
}
@Test(expected = RedisPipelineException.class)
public void testExceptionExecuteNativeWithPipeline() throws Exception {
connection.openPipeline();
connection.execute("iNFo");
connection.execute("SET ", getClass() + "testSetNative", UUID.randomUUID().toString());
connection.execute("ZadD", getClass() + "#foo\t0.90\titem");
connection.closePipeline();
}
@Test
public void testExecuteNativeWithPipeline() throws Exception {
String key1 = getClass() + "#ExecuteNativeWithPipeline#1";
String value1 = UUID.randomUUID().toString();
String key2 = getClass() + "#ExecuteNativeWithPipeline#2";
String value2 = UUID.randomUUID().toString();
connection.openPipeline();
connection.execute("SET", key1, value1);
connection.execute("SET", key2, value2);
connection.execute("GET", key1);
connection.execute("GET", key2);
List<Object> result = connection.closePipeline();
assertEquals(4, result.size());
assertArrayEquals(value1.getBytes(), (byte[]) result.get(2));
assertArrayEquals(value2.getBytes(), (byte[]) result.get(3));
}
}

View File

@@ -91,4 +91,8 @@ public class JRedisConnectionIntegrationTests extends AbstractConnectionIntegrat
public void testBitSet() throws Exception {
}
@Ignore
public void exceptionExecuteNativeWithPipeline() throws Exception {
}
}