Fix SRP exec not returning results of commands

executed in the tx

DATAREDIS-123

- Close pipeline on exec if not explicitly
opened by user and return results of closePipeline,
not result of exec command

- Close pipeline on discard if not explicitly
opened by user
This commit is contained in:
Jennifer Hickey
2013-04-04 20:46:08 -07:00
parent 7813a13f7a
commit aaa28495be
8 changed files with 268 additions and 78 deletions

View File

@@ -61,6 +61,7 @@ public class SrpConnection implements RedisConnection {
private boolean isClosed = false;
private boolean isMulti = false;
private boolean pipelineRequested = false;
private Pipeline pipeline;
private PipelineTracker callback;
private volatile SrpSubscription subscription;
@@ -173,42 +174,14 @@ public class SrpConnection implements RedisConnection {
public void openPipeline() {
if (pipeline == null) {
callback = new PipelineTracker();
pipeline = client.pipeline();
}
pipelineRequested = true;
initPipeline();
}
public List<Object> closePipeline() {
if (pipeline != null) {
pipeline = null;
List<Object> execute = new ArrayList<Object>(callback.complete());
callback.close();
callback = null;
if (execute != null && !execute.isEmpty()) {
Exception cause = null;
for (int i = 0; i < execute.size(); i++) {
Object object = execute.get(i);
if (object instanceof Exception) {
DataAccessException dataAccessException = convertSrpAccessException((Exception) object);
if (cause == null) {
cause = dataAccessException;
}
execute.set(i, dataAccessException);
}
}
if (cause != null) {
throw new RedisPipelineException(cause, execute);
}
return execute;
}
}
return Collections.emptyList();
return closePipeline(true);
}
public List<byte[]> sort(byte[] key, SortParameters params) {
byte[] sort = SrpUtils.sort(params);
@@ -440,11 +413,10 @@ public class SrpConnection implements RedisConnection {
public void discard() {
isMulti = false;
try {
if (isPipelined()) {
// use the normal path
}
client.discard();
if (!pipelineRequested) {
closePipeline(false);
}
} catch (Exception ex) {
throw convertSrpAccessException(ex);
}
@@ -455,10 +427,12 @@ public class SrpConnection implements RedisConnection {
isMulti = false;
try {
Future<Boolean> exec = client.exec();
if (!isPipelined()) {
exec.get();
// Need to wait on execution or subsequent non-pipelined calls like multi may read exec results
exec.get();
if (pipelineRequested) {
return null;
}
return Collections.singletonList((Object) exec);
return closePipeline();
} catch (Exception ex) {
throw convertSrpAccessException(ex);
}
@@ -522,12 +496,8 @@ public class SrpConnection implements RedisConnection {
return;
}
isMulti = true;
openPipeline();
initPipeline();
try {
if (isPipelined()) {
client.multi();
return;
}
client.multi();
} catch (Exception ex) {
throw convertSrpAccessException(ex);
@@ -1899,4 +1869,48 @@ public class SrpConnection implements RedisConnection {
private void pipeline(ListenableFuture<? extends Reply> future) {
callback.addCommand(future);
}
private void initPipeline() {
if (pipeline == null) {
callback = new PipelineTracker();
pipeline = client.pipeline();
}
}
private List<Object> closePipeline(boolean getResults) {
pipelineRequested = false;
List<Object> results = Collections.emptyList();
if (pipeline != null) {
pipeline = null;
if(getResults) {
results = getPipelinedResults();
}
callback.close();
callback = null;
}
return results;
}
private List<Object> getPipelinedResults() {
List<Object> execute = new ArrayList<Object>(callback.complete());
if (execute != null && !execute.isEmpty()) {
Exception cause = null;
for (int i = 0; i < execute.size(); i++) {
Object object = execute.get(i);
if (object instanceof Exception) {
DataAccessException dataAccessException = convertSrpAccessException((Exception) object);
if (cause == null) {
cause = dataAccessException;
}
execute.set(i, dataAccessException);
}
}
if (cause != null) {
throw new RedisPipelineException(cause, execute);
}
return execute;
}
return Collections.emptyList();
}
}

View File

@@ -106,7 +106,7 @@ abstract public class AbstractConnectionPipelineIntegrationTests extends
@Test(expected = RedisPipelineException.class)
public void exceptionExecuteNative() throws Exception {
connection.execute("ZadD", getClass() + "#foo\t0.90\titem");
connection.closePipeline();
getResults();
}
@Test

View File

@@ -1,6 +1,8 @@
package org.springframework.data.redis.connection.lettuce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
@@ -23,6 +25,15 @@ public class LettuceConnectionPipelineTxIntegrationTests extends
getResults();
}
@Test
public void testDbSize() {
connection.set("dbparam", "foo");
assertNull(connection.dbSize());
List<Object> results = getResults();
assertEquals(1, results.size());
assertTrue((Long) results.get(0) > 0);
}
protected void initConnection() {
connection.openPipeline();
connection.multi();
@@ -32,6 +43,7 @@ public class LettuceConnectionPipelineTxIntegrationTests extends
assertNull(connection.exec());
List<Object> results = new ArrayList<Object>();
List<Object> pipelinedResults = connection.closePipeline();
// filter out the return value of exec
for (Object result : pipelinedResults) {
if (!"OK".equals(result) && !("QUEUED").equals(result)) {
results.add(result);

View File

@@ -15,11 +15,6 @@
*/
package org.springframework.data.redis.connection.lettuce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -90,15 +85,6 @@ public class LettuceConnectionTransactionIntegrationTests extends
public void testBRPopLPushTimeout() {
}
@Test
public void testDbSize() {
connection.set("dbparam", "foo");
assertNull(connection.dbSize());
List<Object> results = getResults();
assertEquals(1, results.size());
assertTrue((Long) results.get(0) > 0);
}
@Test
public void exceptionExecuteNative() throws Exception {
actual.add(connection.execute("ZadD", getClass() + "#foo\t0.90\titem"));
@@ -112,13 +98,6 @@ public class LettuceConnectionTransactionIntegrationTests extends
}
protected List<Object> getResults() {
List<Object> results = new ArrayList<Object>();
List<Object> txResults = connection.exec();
for (Object result : txResults) {
if (!"OK".equals(result) && !("QUEUED").equals(result)) {
results.add(result);
}
}
return results;
return connection.exec();
}
}

View File

@@ -53,22 +53,14 @@ public class SrpConnectionIntegrationTests extends AbstractConnectionIntegration
connection = null;
}
@Ignore("DATAREDIS-123, exec does not return command results")
public void testMultiExec() throws Exception {
}
@Ignore("DATAREDIS-123, exec does not return command results")
@Ignore("DATAREDIS-169 SRP discard does not clear txReplies, results in inconsistent results on next tx exec")
public void testMultiDiscard() {
}
@Ignore("DATAREDIS-123, exec does not return command results")
@Ignore("DATAREDIS-168 SRP exec throws TransactionFailedException if watched value modified")
public void testWatch() {
}
@Ignore("DATAREDIS-123, exec does not return command results")
public void testUnwatch() {
}
@Ignore("DATAREDIS-130, sort not working")
public void testSort() {
}

View File

@@ -114,7 +114,7 @@ public class SrpConnectionPipelineIntegrationTests extends
@Test
public void testInfo() throws Exception {
assertNull(connection.info());
List<Object> results = connection.closePipeline();
List<Object> results = getResults();
assertEquals(1, results.size());
Properties info = SrpUtils.info(new BulkReply((byte[]) results.get(0)));
assertTrue("at least 5 settings should be present", info.size() >= 5);
@@ -176,7 +176,7 @@ public class SrpConnectionPipelineIntegrationTests extends
super.testGetRangeSetRange();
}
private int getRedisVersion() {
protected int getRedisVersion() {
connection.closePipeline();
int version = RedisClientBase.parseVersion((String)connection.info().get("redis_version"));
connection.openPipeline();

View File

@@ -0,0 +1,90 @@
/*
* Copyright 2011-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.connection.srp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.util.List;
import org.junit.Test;
import org.springframework.data.redis.connection.RedisPipelineException;
import redis.client.RedisClientBase;
/**
* Integration test of {@link SrpConnection} transactions within a pipeline
*
* @author Jennifer Hickey
*
*/
public class SrpConnectionPipelineTxIntegrationTests extends
SrpConnectionTransactionIntegrationTests {
@Test(expected = RedisPipelineException.class)
public void exceptionExecuteNative() throws Exception {
connection.execute("ZadD", getClass() + "#foo\t0.90\titem");
getResults();
}
@Test
public void testUsePipelineAfterTxExec() {
connection.set("foo", "bar");
assertNull(connection.exec());
assertNull(connection.get("foo"));
List<Object> results = connection.closePipeline();
assertEquals(2, results.size());
assertEquals("OK", results.get(0));
assertEquals("bar", new String((byte[]) results.get(1)));
assertEquals("bar", connection.get("foo"));
}
@Test
public void testExec2TransactionsInPipeline() {
connection.set("foo", "bar");
assertNull(connection.get("foo"));
assertNull(connection.exec());
connection.multi();
connection.set("foo", "baz");
assertNull(connection.get("foo"));
assertNull(connection.exec());
List<Object> results = connection.closePipeline();
assertEquals(4, results.size());
assertEquals("OK", results.get(0));
assertEquals("bar", new String((byte[]) results.get(1)));
assertEquals("OK", results.get(2));
assertEquals("baz", new String((byte[]) results.get(3)));
}
protected void initConnection() {
connection.openPipeline();
connection.multi();
}
protected List<Object> getResults() {
assertNull(connection.exec());
return connection.closePipeline();
}
protected int getRedisVersion() {
connection.exec();
connection.closePipeline();
int version = RedisClientBase.parseVersion((String) connection.info()
.get("redis_version"));
initConnection();
return version;
}
}

View File

@@ -0,0 +1,103 @@
/*
* Copyright 2011-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.connection.srp;
import java.util.List;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.data.redis.RedisSystemException;
import redis.client.RedisClientBase;
/**
* Integration test of {@link SrpConnection} functionality within a transaction
*
* @author Jennifer Hickey
*
*/
public class SrpConnectionTransactionIntegrationTests extends
SrpConnectionPipelineIntegrationTests {
@Ignore
public void testMultiDiscard() {
}
@Ignore
public void testMultiExec() {
}
@Ignore
public void testUnwatch() {
}
@Ignore
public void testWatch() {
}
/*
* Using blocking ops inside a tx does not make a lot of sense as it would
* require blocking the entire server in order to execute the block
* atomically, which in turn does not allow other clients to perform a push
* operation.
*/
@Ignore
public void testBLPop() {
}
@Ignore
public void testBRPop() {
}
@Ignore
public void testBRPopLPush() {
}
@Ignore
public void testBLPopTimeout() {
}
@Ignore
public void testBRPopTimeout() {
}
@Ignore
public void testBRPopLPushTimeout() {
}
@Test(expected = RedisSystemException.class)
public void exceptionExecuteNative() throws Exception {
connection.execute("ZadD", getClass() + "#foo\t0.90\titem");
getResults();
}
protected void initConnection() {
connection.multi();
}
protected List<Object> getResults() {
return connection.exec();
}
protected int getRedisVersion() {
connection.exec();
int version = RedisClientBase.parseVersion((String) connection.info()
.get("redis_version"));
connection.multi();
return version;
}
}