diff --git a/src/main/java/org/springframework/data/redis/connection/srp/SrpConnection.java b/src/main/java/org/springframework/data/redis/connection/srp/SrpConnection.java index 8a6387354..ca4177d25 100644 --- a/src/main/java/org/springframework/data/redis/connection/srp/SrpConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/srp/SrpConnection.java @@ -61,6 +61,7 @@ public class SrpConnection implements RedisConnection { private boolean isClosed = false; private boolean isMulti = false; + private boolean pipelineRequested = false; private Pipeline pipeline; private PipelineTracker callback; private volatile SrpSubscription subscription; @@ -173,42 +174,14 @@ public class SrpConnection implements RedisConnection { public void openPipeline() { - if (pipeline == null) { - callback = new PipelineTracker(); - pipeline = client.pipeline(); - } + pipelineRequested = true; + initPipeline(); } public List closePipeline() { - if (pipeline != null) { - pipeline = null; - List execute = new ArrayList(callback.complete()); - callback.close(); - callback = null; - if (execute != null && !execute.isEmpty()) { - Exception cause = null; - for (int i = 0; i < execute.size(); i++) { - Object object = execute.get(i); - if (object instanceof Exception) { - DataAccessException dataAccessException = convertSrpAccessException((Exception) object); - if (cause == null) { - cause = dataAccessException; - } - execute.set(i, dataAccessException); - } - } - if (cause != null) { - throw new RedisPipelineException(cause, execute); - } - - return execute; - } - } - - return Collections.emptyList(); + return closePipeline(true); } - public List sort(byte[] key, SortParameters params) { byte[] sort = SrpUtils.sort(params); @@ -440,11 +413,10 @@ public class SrpConnection implements RedisConnection { public void discard() { isMulti = false; try { - if (isPipelined()) { - // use the normal path - } - client.discard(); + if (!pipelineRequested) { + closePipeline(false); + } } catch (Exception ex) { throw convertSrpAccessException(ex); } @@ -455,10 +427,12 @@ public class SrpConnection implements RedisConnection { isMulti = false; try { Future exec = client.exec(); - if (!isPipelined()) { - exec.get(); + // Need to wait on execution or subsequent non-pipelined calls like multi may read exec results + exec.get(); + if (pipelineRequested) { + return null; } - return Collections.singletonList((Object) exec); + return closePipeline(); } catch (Exception ex) { throw convertSrpAccessException(ex); } @@ -522,12 +496,8 @@ public class SrpConnection implements RedisConnection { return; } isMulti = true; - openPipeline(); + initPipeline(); try { - if (isPipelined()) { - client.multi(); - return; - } client.multi(); } catch (Exception ex) { throw convertSrpAccessException(ex); @@ -1899,4 +1869,48 @@ public class SrpConnection implements RedisConnection { private void pipeline(ListenableFuture future) { callback.addCommand(future); } + + private void initPipeline() { + if (pipeline == null) { + callback = new PipelineTracker(); + pipeline = client.pipeline(); + } + } + + private List closePipeline(boolean getResults) { + pipelineRequested = false; + List results = Collections.emptyList(); + if (pipeline != null) { + pipeline = null; + if(getResults) { + results = getPipelinedResults(); + } + callback.close(); + callback = null; + } + return results; + } + + private List getPipelinedResults() { + List execute = new ArrayList(callback.complete()); + if (execute != null && !execute.isEmpty()) { + Exception cause = null; + for (int i = 0; i < execute.size(); i++) { + Object object = execute.get(i); + if (object instanceof Exception) { + DataAccessException dataAccessException = convertSrpAccessException((Exception) object); + if (cause == null) { + cause = dataAccessException; + } + execute.set(i, dataAccessException); + } + } + if (cause != null) { + throw new RedisPipelineException(cause, execute); + } + + return execute; + } + return Collections.emptyList(); + } } \ No newline at end of file diff --git a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionPipelineIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionPipelineIntegrationTests.java index 71338edd7..70e7929d5 100644 --- a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionPipelineIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionPipelineIntegrationTests.java @@ -106,7 +106,7 @@ abstract public class AbstractConnectionPipelineIntegrationTests extends @Test(expected = RedisPipelineException.class) public void exceptionExecuteNative() throws Exception { connection.execute("ZadD", getClass() + "#foo\t0.90\titem"); - connection.closePipeline(); + getResults(); } @Test diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionPipelineTxIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionPipelineTxIntegrationTests.java index b097184fe..3f5646cc5 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionPipelineTxIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionPipelineTxIntegrationTests.java @@ -1,6 +1,8 @@ package org.springframework.data.redis.connection.lettuce; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.List; @@ -23,6 +25,15 @@ public class LettuceConnectionPipelineTxIntegrationTests extends getResults(); } + @Test + public void testDbSize() { + connection.set("dbparam", "foo"); + assertNull(connection.dbSize()); + List results = getResults(); + assertEquals(1, results.size()); + assertTrue((Long) results.get(0) > 0); + } + protected void initConnection() { connection.openPipeline(); connection.multi(); @@ -32,6 +43,7 @@ public class LettuceConnectionPipelineTxIntegrationTests extends assertNull(connection.exec()); List results = new ArrayList(); List pipelinedResults = connection.closePipeline(); + // filter out the return value of exec for (Object result : pipelinedResults) { if (!"OK".equals(result) && !("QUEUED").equals(result)) { results.add(result); diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionTransactionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionTransactionIntegrationTests.java index 00f4baa72..26851fa87 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionTransactionIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionTransactionIntegrationTests.java @@ -15,11 +15,6 @@ */ package org.springframework.data.redis.connection.lettuce; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -90,15 +85,6 @@ public class LettuceConnectionTransactionIntegrationTests extends public void testBRPopLPushTimeout() { } - @Test - public void testDbSize() { - connection.set("dbparam", "foo"); - assertNull(connection.dbSize()); - List results = getResults(); - assertEquals(1, results.size()); - assertTrue((Long) results.get(0) > 0); - } - @Test public void exceptionExecuteNative() throws Exception { actual.add(connection.execute("ZadD", getClass() + "#foo\t0.90\titem")); @@ -112,13 +98,6 @@ public class LettuceConnectionTransactionIntegrationTests extends } protected List getResults() { - List results = new ArrayList(); - List txResults = connection.exec(); - for (Object result : txResults) { - if (!"OK".equals(result) && !("QUEUED").equals(result)) { - results.add(result); - } - } - return results; + return connection.exec(); } } diff --git a/src/test/java/org/springframework/data/redis/connection/srp/SrpConnectionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/srp/SrpConnectionIntegrationTests.java index e52476a33..b3d203192 100644 --- a/src/test/java/org/springframework/data/redis/connection/srp/SrpConnectionIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/srp/SrpConnectionIntegrationTests.java @@ -53,22 +53,14 @@ public class SrpConnectionIntegrationTests extends AbstractConnectionIntegration connection = null; } - @Ignore("DATAREDIS-123, exec does not return command results") - public void testMultiExec() throws Exception { - } - - @Ignore("DATAREDIS-123, exec does not return command results") + @Ignore("DATAREDIS-169 SRP discard does not clear txReplies, results in inconsistent results on next tx exec") public void testMultiDiscard() { } - @Ignore("DATAREDIS-123, exec does not return command results") + @Ignore("DATAREDIS-168 SRP exec throws TransactionFailedException if watched value modified") public void testWatch() { } - @Ignore("DATAREDIS-123, exec does not return command results") - public void testUnwatch() { - } - @Ignore("DATAREDIS-130, sort not working") public void testSort() { } diff --git a/src/test/java/org/springframework/data/redis/connection/srp/SrpConnectionPipelineIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/srp/SrpConnectionPipelineIntegrationTests.java index 3da9a1133..10f0522d0 100644 --- a/src/test/java/org/springframework/data/redis/connection/srp/SrpConnectionPipelineIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/srp/SrpConnectionPipelineIntegrationTests.java @@ -114,7 +114,7 @@ public class SrpConnectionPipelineIntegrationTests extends @Test public void testInfo() throws Exception { assertNull(connection.info()); - List results = connection.closePipeline(); + List results = getResults(); assertEquals(1, results.size()); Properties info = SrpUtils.info(new BulkReply((byte[]) results.get(0))); assertTrue("at least 5 settings should be present", info.size() >= 5); @@ -176,7 +176,7 @@ public class SrpConnectionPipelineIntegrationTests extends super.testGetRangeSetRange(); } - private int getRedisVersion() { + protected int getRedisVersion() { connection.closePipeline(); int version = RedisClientBase.parseVersion((String)connection.info().get("redis_version")); connection.openPipeline(); diff --git a/src/test/java/org/springframework/data/redis/connection/srp/SrpConnectionPipelineTxIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/srp/SrpConnectionPipelineTxIntegrationTests.java new file mode 100644 index 000000000..5407fe4ce --- /dev/null +++ b/src/test/java/org/springframework/data/redis/connection/srp/SrpConnectionPipelineTxIntegrationTests.java @@ -0,0 +1,90 @@ +/* + * Copyright 2011-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection.srp; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.List; + +import org.junit.Test; +import org.springframework.data.redis.connection.RedisPipelineException; + +import redis.client.RedisClientBase; + +/** + * Integration test of {@link SrpConnection} transactions within a pipeline + * + * @author Jennifer Hickey + * + */ +public class SrpConnectionPipelineTxIntegrationTests extends + SrpConnectionTransactionIntegrationTests { + + @Test(expected = RedisPipelineException.class) + public void exceptionExecuteNative() throws Exception { + connection.execute("ZadD", getClass() + "#foo\t0.90\titem"); + getResults(); + } + + @Test + public void testUsePipelineAfterTxExec() { + connection.set("foo", "bar"); + assertNull(connection.exec()); + assertNull(connection.get("foo")); + List results = connection.closePipeline(); + assertEquals(2, results.size()); + assertEquals("OK", results.get(0)); + assertEquals("bar", new String((byte[]) results.get(1))); + assertEquals("bar", connection.get("foo")); + } + + @Test + public void testExec2TransactionsInPipeline() { + connection.set("foo", "bar"); + assertNull(connection.get("foo")); + assertNull(connection.exec()); + connection.multi(); + connection.set("foo", "baz"); + assertNull(connection.get("foo")); + assertNull(connection.exec()); + List results = connection.closePipeline(); + assertEquals(4, results.size()); + assertEquals("OK", results.get(0)); + assertEquals("bar", new String((byte[]) results.get(1))); + assertEquals("OK", results.get(2)); + assertEquals("baz", new String((byte[]) results.get(3))); + } + + protected void initConnection() { + connection.openPipeline(); + connection.multi(); + } + + protected List getResults() { + assertNull(connection.exec()); + return connection.closePipeline(); + } + + protected int getRedisVersion() { + connection.exec(); + connection.closePipeline(); + int version = RedisClientBase.parseVersion((String) connection.info() + .get("redis_version")); + initConnection(); + return version; + } +} diff --git a/src/test/java/org/springframework/data/redis/connection/srp/SrpConnectionTransactionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/srp/SrpConnectionTransactionIntegrationTests.java new file mode 100644 index 000000000..d0dc15e8b --- /dev/null +++ b/src/test/java/org/springframework/data/redis/connection/srp/SrpConnectionTransactionIntegrationTests.java @@ -0,0 +1,103 @@ +/* + * Copyright 2011-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection.srp; + +import java.util.List; + +import org.junit.Ignore; +import org.junit.Test; +import org.springframework.data.redis.RedisSystemException; + +import redis.client.RedisClientBase; + +/** + * Integration test of {@link SrpConnection} functionality within a transaction + * + * @author Jennifer Hickey + * + */ +public class SrpConnectionTransactionIntegrationTests extends + SrpConnectionPipelineIntegrationTests { + + @Ignore + public void testMultiDiscard() { + } + + @Ignore + public void testMultiExec() { + } + + @Ignore + public void testUnwatch() { + } + + @Ignore + public void testWatch() { + } + + /* + * Using blocking ops inside a tx does not make a lot of sense as it would + * require blocking the entire server in order to execute the block + * atomically, which in turn does not allow other clients to perform a push + * operation. + */ + + @Ignore + public void testBLPop() { + } + + @Ignore + public void testBRPop() { + } + + @Ignore + public void testBRPopLPush() { + } + + @Ignore + public void testBLPopTimeout() { + } + + @Ignore + public void testBRPopTimeout() { + } + + @Ignore + public void testBRPopLPushTimeout() { + } + + @Test(expected = RedisSystemException.class) + public void exceptionExecuteNative() throws Exception { + connection.execute("ZadD", getClass() + "#foo\t0.90\titem"); + getResults(); + } + + protected void initConnection() { + connection.multi(); + } + + protected List getResults() { + return connection.exec(); + } + + protected int getRedisVersion() { + connection.exec(); + int version = RedisClientBase.parseVersion((String) connection.info() + .get("redis_version")); + connection.multi(); + return version; + } +}