(getAsyncBlockingConnection());
+ }
+ return blockingConn;
+ }
}
\ No newline at end of file
diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java
index 1f1eff642..779cbe932 100644
--- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java
+++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java
@@ -18,36 +18,58 @@ package org.springframework.data.redis.connection.lettuce;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.util.Assert;
+import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.RedisClient;
+import com.lambdaworks.redis.RedisException;
/**
- * Connection factory creating Lettuce-based connections.
- *
+ * Connection factory creating Lettuce-based connections.
+ *
+ * This factory creates a new {@link LettuceConnection} on each call to
+ * {@link #getConnection()}. Multiple {@link LettuceConnection}s share a single
+ * thread-safe native connection.
+ *
+ *
+ * The shared native connection is never closed by {@link LettuceConnection},
+ * therefore it is not validated by default on {@link #getConnection()}. Use
+ * {@link #setValidateConnection(boolean)} to change this behavior if necessary.
+ *
* @author Costin Leau
+ * @author Jennifer Hickey
*/
-public class LettuceConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {
+public class LettuceConnectionFactory implements InitializingBean, DisposableBean,
+ RedisConnectionFactory {
+
+ private final Log log = LogFactory.getLog(getClass());
private String hostName = "localhost";
private int port = 6379;
private RedisClient client;
private long timeout = TimeUnit.MILLISECONDS.convert(60, TimeUnit.SECONDS);
-
+ private boolean validateConnection = false;
+ private RedisAsyncConnection connection;
+ private int dbIndex = 0;
+
/**
- * Constructs a new LettuceConnectionFactory instance
- * with default settings.
+ * Constructs a new LettuceConnectionFactory instance with
+ * default settings.
*/
public LettuceConnectionFactory() {
}
/**
- * Constructs a new LettuceConnectionFactory instance
- * with default settings.
+ * Constructs a new LettuceConnectionFactory instance with
+ * default settings.
*/
public LettuceConnectionFactory(String host, int port) {
this.hostName = host;
@@ -57,6 +79,8 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea
public void afterPropertiesSet() {
client = new RedisClient(hostName, port);
client.setDefaultTimeout(timeout, TimeUnit.MILLISECONDS);
+ // open a single connection to be shared for non-blocking and non-tx ops
+ initConnection();
}
public void destroy() {
@@ -64,7 +88,15 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea
}
public RedisConnection getConnection() {
- return new LettuceConnection(client.connectAsync(LettuceUtils.CODEC), timeout, client);
+ if (validateConnection) {
+ try {
+ new com.lambdaworks.redis.RedisConnection(connection).ping();
+ } catch (RedisException e) {
+ log.warn("Validation of shared connection failed. Creating a new connection.");
+ initConnection();
+ }
+ }
+ return new LettuceConnection(connection, timeout, client);
}
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
@@ -83,7 +115,8 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea
/**
* Sets the host.
*
- * @param host the host to set
+ * @param host
+ * the host to set
*/
public void setHostName(String host) {
this.hostName = host;
@@ -101,7 +134,8 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea
/**
* Sets the port.
*
- * @param port the port to set
+ * @param port
+ * the port to set
*/
public void setPort(int port) {
this.port = port;
@@ -109,7 +143,7 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea
/**
* Returns the connection timeout (in milliseconds).
- *
+ *
* @return connection timeout
*/
public long getTimeout() {
@@ -118,10 +152,68 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea
/**
* Sets the connection timeout (in milliseconds).
- *
- * @param timeout connection timeout
+ *
+ * @param timeout
+ * connection timeout
*/
public void setTimeout(long timeout) {
this.timeout = timeout;
}
+
+ /**
+ * Indicates if validation of the native Lettuce connection is enabled
+ *
+ * @return connection validation enabled
+ */
+ public boolean getValidateConnection() {
+ return validateConnection;
+ }
+
+ /**
+ * Enables validation of the shared native Lettuce connection on calls to
+ * {@link #getConnection()}. A new connection will be created and used if
+ * validation fails.
+ *
+ * Lettuce will automatically reconnect until close is called, which should
+ * never happen through {@link LettuceConnection}, therefore the default is
+ * false.
+ *
+ * Setting this to true will result in a round-trip call to the server on
+ * each new connection, so this setting should only be used if there is code
+ * that is actively closing the native Lettuce connection.
+ *
+ * @param validateConnection
+ * enable connection validation
+ */
+ public void setValidateConnection(boolean validateConnection) {
+ this.validateConnection = validateConnection;
+ }
+
+ /**
+ * Returns the index of the database.
+ *
+ * @return Returns the database index
+ */
+ public int getDatabase() {
+ return dbIndex;
+ }
+
+ /**
+ * Sets the index of the database used by this connection factory. Default
+ * is 0.
+ *
+ * @param index
+ * database index
+ */
+ public void setDatabase(int index) {
+ Assert.isTrue(index >= 0, "invalid DB index (a positive index required)");
+ this.dbIndex = index;
+ }
+
+ protected void initConnection() {
+ connection = client.connectAsync(LettuceUtils.CODEC);
+ if (dbIndex > 0) {
+ connection.select(dbIndex);
+ }
+ }
}
\ No newline at end of file
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java
new file mode 100644
index 000000000..3a1f3d2d0
--- /dev/null
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2011-2013 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.redis.connection.lettuce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertNotSame;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.data.redis.RedisSystemException;
+import org.springframework.data.redis.SettingsUtils;
+import org.springframework.data.redis.connection.DefaultStringRedisConnection;
+import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.connection.StringRedisConnection;
+
+import com.lambdaworks.redis.RedisAsyncConnection;
+
+/**
+ * Integration test of {@link LettuceConnectionFactory}
+ *
+ * @author Jennifer Hickey
+ *
+ */
+public class LettuceConnectionFactoryTests {
+
+ private LettuceConnectionFactory factory;
+
+ private StringRedisConnection connection;
+
+ @Before
+ public void setUp() {
+ factory = new LettuceConnectionFactory(SettingsUtils.getHost(), SettingsUtils.getPort());
+ factory.afterPropertiesSet();
+ connection = new DefaultStringRedisConnection(factory.getConnection());
+ }
+
+ @After
+ public void tearDown() {
+ factory.destroy();
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetNewConnectionOnError() throws Exception {
+ factory.setValidateConnection(true);
+ connection.lPush("alist", "baz");
+ RedisAsyncConnection nativeConn = (RedisAsyncConnection) connection.getNativeConnection();
+ nativeConn.close();
+ // Give some time for async channel close
+ Thread.sleep(500);
+ connection.bLPop(1, "alist".getBytes());
+ try {
+ connection.get("test3");
+ fail("Expected exception using natively closed conn");
+ } catch (RedisSystemException e) {
+ // expected, shared conn is closed
+ }
+ DefaultStringRedisConnection conn2 = new DefaultStringRedisConnection(
+ factory.getConnection());
+ assertNotSame(nativeConn, conn2.getNativeConnection());
+ conn2.set("anotherkey", "anothervalue");
+ assertEquals("anothervalue", conn2.get("anotherkey"));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testConnectionErrorNoValidate() throws Exception {
+ connection.lPush("ablist", "baz");
+ ((RedisAsyncConnection) connection.getNativeConnection()).close();
+ // Give some time for async channel close
+ Thread.sleep(500);
+ DefaultStringRedisConnection conn2 = new DefaultStringRedisConnection(
+ factory.getConnection());
+ try {
+ conn2.set("anotherkey", "anothervalue");
+ fail("Expected exception using natively closed conn");
+ } catch (RedisSystemException e) {
+ // expected, as we are re-using the natively closed conn
+ }
+ }
+
+ @Test
+ public void testValidateNoError() {
+ factory.setValidateConnection(true);
+ RedisConnection conn2 = factory.getConnection();
+ assertSame(connection.getNativeConnection(), conn2.getNativeConnection());
+ }
+
+ @Test
+ public void testSelectDb() {
+ LettuceConnectionFactory factory2 = new LettuceConnectionFactory(SettingsUtils.getHost(),
+ SettingsUtils.getPort());
+ factory2.setDatabase(1);
+ factory2.afterPropertiesSet();
+ StringRedisConnection connection2 = new DefaultStringRedisConnection(
+ factory2.getConnection());
+ connection2.flushDb();
+ // put an item in database 0
+ connection.set("sometestkey", "sometestvalue");
+ try {
+ // there should still be nothing in database 1
+ assertEquals(Long.valueOf(0), connection2.dbSize());
+ } finally {
+ factory2.destroy();
+ }
+ }
+}
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionIntegrationTests.java
index ff834a946..19818b89e 100644
--- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionIntegrationTests.java
@@ -16,11 +16,18 @@
package org.springframework.data.redis.connection.lettuce;
+import java.util.List;
+
import org.junit.Ignore;
+import org.junit.Test;
import org.junit.runner.RunWith;
+import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.AbstractConnectionIntegrationTests;
+import org.springframework.data.redis.connection.DefaultStringRedisConnection;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
/**
* Integration test of {@link LettuceConnection}
@@ -40,4 +47,86 @@ public class LettuceConnectionIntegrationTests extends AbstractConnectionIntegra
@Ignore("DATAREDIS-122 exec never returns null")
public void testUnwatch() throws Exception {
}
+
+ @Test
+ public void testMultiThreadsOneBlocking() throws Exception {
+ Thread th = new Thread(new Runnable() {
+ public void run() {
+ DefaultStringRedisConnection conn2 = new DefaultStringRedisConnection(
+ connectionFactory.getConnection());
+ conn2.openPipeline();
+ conn2.bLPop(3, "multilist");
+ conn2.closePipeline();
+ conn2.close();
+ }
+ });
+ th.start();
+ Thread.sleep(1000);
+ connection.set("heythere", "hi");
+ th.join();
+ assertEquals("hi", connection.get("heythere"));
+ }
+
+ @Test
+ public void testMultiConnectionsOneInTx() throws Exception {
+ connection.set("txs1", "rightnow");
+ connection.multi();
+ connection.set("txs1", "delay");
+ DefaultStringRedisConnection conn2 = new DefaultStringRedisConnection(
+ connectionFactory.getConnection());
+
+ // We get immediate results executing command in separate conn (not part
+ // of tx)
+ conn2.set("txs2", "hi");
+ assertEquals("hi", conn2.get("txs2"));
+
+ // Transactional value not yet set
+ assertEquals("rightnow", conn2.get("txs1"));
+ connection.exec();
+
+ // Now it should be set
+ assertEquals("delay", conn2.get("txs1"));
+ }
+
+ @Test
+ public void testCloseInTransaction() {
+ connection.multi();
+ connection.close();
+ try {
+ connection.exec();
+ fail("Expected exception resuming tx");
+ } catch (RedisSystemException e) {
+ // expected, can't resume tx after closing conn
+ }
+ // can do normal ops after closing
+ connection.get("txclose");
+
+ // can complete a new tx after closing
+ connection.multi();
+ connection.set("txclose", "bar");
+ List