Fix Jedis and RJC continuing to use pipeline after closed
DATAREDIS-162, DATAREDIS-163
This commit is contained in:
@@ -256,6 +256,7 @@ public class JedisConnection implements RedisConnection {
|
||||
public List<Object> closePipeline() {
|
||||
if (pipeline != null) {
|
||||
List<Object> execute = pipeline.syncAndReturnAll();
|
||||
pipeline = null;
|
||||
if (execute != null && !execute.isEmpty()) {
|
||||
Exception cause = null;
|
||||
for (int i = 0; i < execute.size(); i++) {
|
||||
|
||||
@@ -142,6 +142,7 @@ public class RjcConnection implements RedisConnection {
|
||||
|
||||
public List<Object> closePipeline() {
|
||||
if (pipeline != null) {
|
||||
pipeline = null;
|
||||
try {
|
||||
return RjcUtils.maybeConvert(client.getAll());
|
||||
} catch (Exception ex) {
|
||||
|
||||
@@ -1165,7 +1165,7 @@ public abstract class AbstractConnectionIntegrationTests {
|
||||
assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
private boolean exists(String key, long timeout) {
|
||||
protected boolean exists(String key, long timeout) {
|
||||
boolean exists = true;
|
||||
for (long currentTime = System.currentTimeMillis(); System.currentTimeMillis()
|
||||
- currentTime < timeout;) {
|
||||
|
||||
@@ -17,10 +17,10 @@
|
||||
package org.springframework.data.redis.connection;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@@ -103,15 +103,10 @@ abstract public class AbstractConnectionPipelineIntegrationTests extends
|
||||
public void testPubSubWithPatterns() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(expected = RedisPipelineException.class)
|
||||
public void exceptionExecuteNative() throws Exception {
|
||||
connection.execute("ZadD", getClass() + "#foo\t0.90\titem");
|
||||
try {
|
||||
connection.closePipeline();
|
||||
fail("Expected a RedisPipelineException to be thrown");
|
||||
}catch(RedisPipelineException e) {
|
||||
// expected
|
||||
}
|
||||
connection.closePipeline();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -126,9 +121,8 @@ abstract public class AbstractConnectionPipelineIntegrationTests extends
|
||||
public void testExpire() throws Exception {
|
||||
connection.set("exp", "true");
|
||||
actual.add(connection.expire("exp", 1));
|
||||
Thread.sleep(2000);
|
||||
actual.add(connection.exists("exp"));
|
||||
verifyResults(Arrays.asList(new Object[] { 1l, 0l }), actual);
|
||||
verifyResults(Arrays.asList(new Object[] { 1l }), actual);
|
||||
assertFalse(exists("exp", 2500l));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -136,9 +130,8 @@ abstract public class AbstractConnectionPipelineIntegrationTests extends
|
||||
public void testExpireAt() throws Exception {
|
||||
connection.set("exp2", "true");
|
||||
actual.add(connection.expireAt("exp2", System.currentTimeMillis() / 1000 + 1));
|
||||
Thread.sleep(2000);
|
||||
actual.add(connection.exists("exp2"));
|
||||
verifyResults(Arrays.asList(new Object[] { 1l, 0l }), actual);
|
||||
verifyResults(Arrays.asList(new Object[] { 1l }), actual);
|
||||
assertFalse(exists("exp2", 2500l));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -147,9 +140,9 @@ abstract public class AbstractConnectionPipelineIntegrationTests extends
|
||||
connection.set("exp3", "true");
|
||||
actual.add(connection.expire("exp3", 1));
|
||||
actual.add(connection.persist("exp3"));
|
||||
verifyResults(Arrays.asList(new Object[] { 1l, 1l }), actual);
|
||||
Thread.sleep(1500);
|
||||
actual.add(connection.exists("exp3"));
|
||||
verifyResults(Arrays.asList(new Object[] { 1l, 1l, 1l }), actual);
|
||||
assertTrue(connection.exists("exp3"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -157,9 +150,8 @@ abstract public class AbstractConnectionPipelineIntegrationTests extends
|
||||
public void testSetEx() throws Exception {
|
||||
connection.setEx("expy", 1l, "yep");
|
||||
actual.add(connection.get("expy"));
|
||||
Thread.sleep(2000);
|
||||
actual.add(connection.exists("expy"));
|
||||
verifyResults(Arrays.asList(new Object[] { "yep", 0l }), actual);
|
||||
verifyResults(Arrays.asList(new Object[] { "yep" }), actual);
|
||||
assertFalse(exists("expy", 2500l));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -31,7 +31,6 @@ import org.springframework.data.redis.RedisSystemException;
|
||||
import org.springframework.data.redis.connection.AbstractConnectionPipelineIntegrationTests;
|
||||
import org.springframework.data.redis.connection.DefaultStringTuple;
|
||||
import org.springframework.data.redis.connection.StringRedisConnection.StringTuple;
|
||||
import org.springframework.test.annotation.IfProfileValue;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
|
||||
@@ -92,14 +91,6 @@ public class JedisConnectionPipelineIntegrationTests extends
|
||||
public void testMultiDiscard() {
|
||||
}
|
||||
|
||||
@Ignore("DATAREDIS-155 Exists returns true after key is supposed to expire")
|
||||
public void testExpire() throws Exception {
|
||||
}
|
||||
|
||||
@Ignore("DATAREDIS-155 Exists returns true after key is supposed to expire")
|
||||
public void testSetEx() throws Exception {
|
||||
}
|
||||
|
||||
// Unsupported Ops
|
||||
@Test(expected = RedisSystemException.class)
|
||||
public void testBitSet() throws Exception {
|
||||
@@ -230,27 +221,6 @@ public class JedisConnectionPipelineIntegrationTests extends
|
||||
verifyResults(Arrays.asList(new Object[] { 1l, 1l, value1, expected }), actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
@IfProfileValue(name = "runLongTests", value = "true")
|
||||
public void testExpireAt() throws Exception {
|
||||
connection.set("exp2", "true");
|
||||
actual.add(connection.expireAt("exp2", System.currentTimeMillis() / 1000 + 1));
|
||||
Thread.sleep(2000);
|
||||
actual.add(connection.exists("exp2"));
|
||||
verifyResults(Arrays.asList(new Object[] { 1l, false }), actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
@IfProfileValue(name = "runLongTests", value = "true")
|
||||
public void testPersist() throws Exception {
|
||||
connection.set("exp3", "true");
|
||||
actual.add(connection.expire("exp3", 1));
|
||||
actual.add(connection.persist("exp3"));
|
||||
Thread.sleep(1500);
|
||||
actual.add(connection.exists("exp3"));
|
||||
verifyResults(Arrays.asList(new Object[] { 1l, 1l, true }), actual);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
protected Object convertResult(Object result) {
|
||||
Object convertedResult = super.convertResult(result);
|
||||
|
||||
@@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@@ -88,7 +89,7 @@ public class LettuceConnectionPipelineIntegrationTests extends
|
||||
@Test
|
||||
public void testInfo() throws Exception {
|
||||
assertNull(connection.info());
|
||||
List<Object> results = connection.closePipeline();
|
||||
List<Object> results = getResults();
|
||||
assertEquals(1, results.size());
|
||||
Properties info = LettuceUtils.info((String) results.get(0));
|
||||
assertTrue("at least 5 settings should be present", info.size() >= 5);
|
||||
@@ -199,9 +200,9 @@ public class LettuceConnectionPipelineIntegrationTests extends
|
||||
connection.set("exp3", "true");
|
||||
actual.add(connection.expire("exp3", 1));
|
||||
actual.add(connection.persist("exp3"));
|
||||
verifyResults(Arrays.asList(new Object[] { true, true }), actual);
|
||||
Thread.sleep(1500);
|
||||
actual.add(connection.exists("exp3"));
|
||||
verifyResults(Arrays.asList(new Object[] { true, true, true }), actual);
|
||||
assertTrue(connection.exists("exp3"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -209,9 +210,8 @@ public class LettuceConnectionPipelineIntegrationTests extends
|
||||
public void testExpireAt() throws Exception {
|
||||
connection.set("exp2", "true");
|
||||
actual.add(connection.expireAt("exp2", System.currentTimeMillis() / 1000 + 1));
|
||||
Thread.sleep(2000);
|
||||
actual.add(connection.exists("exp2"));
|
||||
verifyResults(Arrays.asList(new Object[] { true, false }), actual);
|
||||
verifyResults(Arrays.asList(new Object[] { true}), actual);
|
||||
assertFalse(exists("exp2", 2500l));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -219,19 +219,8 @@ public class LettuceConnectionPipelineIntegrationTests extends
|
||||
public void testExpire() throws Exception {
|
||||
connection.set("exp", "true");
|
||||
actual.add(connection.expire("exp", 1));
|
||||
Thread.sleep(2000);
|
||||
actual.add(connection.exists("exp"));
|
||||
verifyResults(Arrays.asList(new Object[] { true, false }), actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
@IfProfileValue(name = "runLongTests", value = "true")
|
||||
public void testSetEx() throws Exception {
|
||||
connection.setEx("expy", 1l, "yep");
|
||||
actual.add(connection.get("expy"));
|
||||
Thread.sleep(2000);
|
||||
actual.add(connection.exists("expy"));
|
||||
verifyResults(Arrays.asList(new Object[] { "yep", false }), actual);
|
||||
verifyResults(Arrays.asList(new Object[] { true }), actual);
|
||||
assertFalse(exists("exp", 2500));
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
|
||||
@@ -24,7 +24,6 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
@@ -48,11 +47,6 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
public class RjcConnectionPipelineIntegrationTests extends
|
||||
AbstractConnectionPipelineIntegrationTests {
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Ignore("DATAREDIS-134 string ops do not work with encoded values")
|
||||
public void testGetRangeSetRange() {
|
||||
}
|
||||
@@ -125,6 +119,11 @@ public class RjcConnectionPipelineIntegrationTests extends
|
||||
public void testType() {
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("DATAREDIS-161 Syntax error in pipelined RJC op causes SocketTimeouts on subsequent calls")
|
||||
public void exceptionExecuteNative() throws Exception {
|
||||
}
|
||||
|
||||
// Overrides, usually due to return values being Long vs Boolean or Set vs
|
||||
// List
|
||||
|
||||
|
||||
Reference in New Issue
Block a user