fix bug that return an empty list for #closePipeline

This commit is contained in:
Costin Leau
2012-06-25 21:20:58 +03:00
parent d46b23fe1b
commit ebe05a04a0
3 changed files with 29 additions and 8 deletions

View File

@@ -237,7 +237,7 @@ public class JedisConnection implements RedisConnection {
@SuppressWarnings("unchecked")
public List<Object> closePipeline() {
if (pipeline != null) {
List execute = pipeline.syncAndReturnAll();
List<Object> execute = pipeline.syncAndReturnAll();
if (execute != null && !execute.isEmpty()) {
Exception cause = null;
for (int i = 0; i < execute.size(); i++) {
@@ -250,9 +250,12 @@ public class JedisConnection implements RedisConnection {
execute.set(i, dataAccessException);
}
}
if (cause != null) {
throw new RedisPipelineException(cause, execute);
}
return execute;
}
}
return Collections.emptyList();

View File

@@ -64,13 +64,13 @@ public class SrpConnection implements RedisConnection {
private PipelineTracker callback;
private volatile SrpSubscription subscription;
private static class PipelineTracker implements FutureCallback<Object> {
private static class PipelineTracker implements FutureCallback<Reply> {
private final List<Object> results = Collections.synchronizedList(new ArrayList<Object>());
private final List<ListenableFuture<?>> futures = new ArrayList<ListenableFuture<?>>();
private final List<ListenableFuture<? extends Reply>> futures = new ArrayList<ListenableFuture<? extends Reply>>();
public void onSuccess(Object result) {
results.add(result);
public void onSuccess(Reply result) {
results.add(result.data());
}
public void onFailure(Throwable t) {
@@ -87,7 +87,7 @@ public class SrpConnection implements RedisConnection {
return results;
}
public void addCommand(ListenableFuture<?> future) {
public void addCommand(ListenableFuture<? extends Reply> future) {
futures.add(future);
Futures.addCallback(future, this);
}
@@ -191,6 +191,7 @@ public class SrpConnection implements RedisConnection {
if (cause != null) {
throw new RedisPipelineException(cause, execute);
}
return execute;
}
}
@@ -1882,7 +1883,7 @@ public class SrpConnection implements RedisConnection {
}
// processing method that adds a listener to the future in order to track down the results and close the pipeline
private void pipeline(ListenableFuture<?> future) {
private void pipeline(ListenableFuture<? extends Reply> future) {
callback.addCommand(future);
}
}

View File

@@ -347,7 +347,7 @@ public abstract class AbstractConnectionIntegrationTests {
}
@Test(expected = RedisPipelineException.class)
public void exceptionExecuteNativeWithPipeline() throws Exception {
public void testExceptionExecuteNativeWithPipeline() throws Exception {
connection.openPipeline();
connection.execute("iNFo");
connection.execute("SET ", getClass() + "testSetNative", UUID.randomUUID().toString());
@@ -355,4 +355,21 @@ public abstract class AbstractConnectionIntegrationTests {
connection.closePipeline();
}
@Test
public void testExecuteNativeWithPipeline() throws Exception {
String key1 = getClass() + "#ExecuteNativeWithPipeline#1";
String value1 = UUID.randomUUID().toString();
String key2 = getClass() + "#ExecuteNativeWithPipeline#2";
String value2 = UUID.randomUUID().toString();
connection.openPipeline();
connection.execute("SET", key1, value1);
connection.execute("SET", key2, value2);
connection.execute("GET", key1);
connection.execute("GET", key2);
List<Object> result = connection.closePipeline();
assertEquals(4, result.size());
assertArrayEquals(value1.getBytes(), (byte[]) result.get(2));
assertArrayEquals(value2.getBytes(), (byte[]) result.get(3));
}
}