Share native connection between LettuceConnections

to minimize socket usage

DATAREDIS-126
This commit is contained in:
Jennifer Hickey
2013-03-20 14:04:45 -07:00
parent f4f9f06356
commit 0a702bf4af
5 changed files with 674 additions and 280 deletions

View File

@@ -18,36 +18,58 @@ package org.springframework.data.redis.connection.lettuce;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.util.Assert;
import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisException;
/**
* Connection factory creating <a href="http://github.com/wg/lettuce">Lettuce</a>-based connections.
*
* Connection factory creating <a
* href="http://github.com/wg/lettuce">Lettuce</a>-based connections.
* <p>
* This factory creates a new {@link LettuceConnection} on each call to
* {@link #getConnection()}. Multiple {@link LettuceConnection}s share a single
* thread-safe native connection.
*
* <p>
* The shared native connection is never closed by {@link LettuceConnection},
* therefore it is not validated by default on {@link #getConnection()}. Use
* {@link #setValidateConnection(boolean)} to change this behavior if necessary.
*
* @author Costin Leau
* @author Jennifer Hickey
*/
public class LettuceConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {
public class LettuceConnectionFactory implements InitializingBean, DisposableBean,
RedisConnectionFactory {
private final Log log = LogFactory.getLog(getClass());
private String hostName = "localhost";
private int port = 6379;
private RedisClient client;
private long timeout = TimeUnit.MILLISECONDS.convert(60, TimeUnit.SECONDS);
private boolean validateConnection = false;
private RedisAsyncConnection<byte[], byte[]> connection;
private int dbIndex = 0;
/**
* Constructs a new <code>LettuceConnectionFactory</code> instance
* with default settings.
* Constructs a new <code>LettuceConnectionFactory</code> instance with
* default settings.
*/
public LettuceConnectionFactory() {
}
/**
* Constructs a new <code>LettuceConnectionFactory</code> instance
* with default settings.
* Constructs a new <code>LettuceConnectionFactory</code> instance with
* default settings.
*/
public LettuceConnectionFactory(String host, int port) {
this.hostName = host;
@@ -57,6 +79,8 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea
public void afterPropertiesSet() {
client = new RedisClient(hostName, port);
client.setDefaultTimeout(timeout, TimeUnit.MILLISECONDS);
// open a single connection to be shared for non-blocking and non-tx ops
initConnection();
}
public void destroy() {
@@ -64,7 +88,15 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea
}
public RedisConnection getConnection() {
return new LettuceConnection(client.connectAsync(LettuceUtils.CODEC), timeout, client);
if (validateConnection) {
try {
new com.lambdaworks.redis.RedisConnection<byte[], byte[]>(connection).ping();
} catch (RedisException e) {
log.warn("Validation of shared connection failed. Creating a new connection.");
initConnection();
}
}
return new LettuceConnection(connection, timeout, client);
}
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
@@ -83,7 +115,8 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea
/**
* Sets the host.
*
* @param host the host to set
* @param host
* the host to set
*/
public void setHostName(String host) {
this.hostName = host;
@@ -101,7 +134,8 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea
/**
* Sets the port.
*
* @param port the port to set
* @param port
* the port to set
*/
public void setPort(int port) {
this.port = port;
@@ -109,7 +143,7 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea
/**
* Returns the connection timeout (in milliseconds).
*
*
* @return connection timeout
*/
public long getTimeout() {
@@ -118,10 +152,68 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea
/**
* Sets the connection timeout (in milliseconds).
*
* @param timeout connection timeout
*
* @param timeout
* connection timeout
*/
public void setTimeout(long timeout) {
this.timeout = timeout;
}
/**
* Indicates if validation of the native Lettuce connection is enabled
*
* @return connection validation enabled
*/
public boolean getValidateConnection() {
return validateConnection;
}
/**
* Enables validation of the shared native Lettuce connection on calls to
* {@link #getConnection()}. A new connection will be created and used if
* validation fails.
* <p>
* Lettuce will automatically reconnect until close is called, which should
* never happen through {@link LettuceConnection}, therefore the default is
* false.
* <p>
* Setting this to true will result in a round-trip call to the server on
* each new connection, so this setting should only be used if there is code
* that is actively closing the native Lettuce connection.
*
* @param validateConnection
* enable connection validation
*/
public void setValidateConnection(boolean validateConnection) {
this.validateConnection = validateConnection;
}
/**
* Returns the index of the database.
*
* @return Returns the database index
*/
public int getDatabase() {
return dbIndex;
}
/**
* Sets the index of the database used by this connection factory. Default
* is 0.
*
* @param index
* database index
*/
public void setDatabase(int index) {
Assert.isTrue(index >= 0, "invalid DB index (a positive index required)");
this.dbIndex = index;
}
protected void initConnection() {
connection = client.connectAsync(LettuceUtils.CODEC);
if (dbIndex > 0) {
connection.select(dbIndex);
}
}
}

View File

@@ -0,0 +1,122 @@
/*
* Copyright 2011-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.connection.lettuce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertNotSame;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.SettingsUtils;
import org.springframework.data.redis.connection.DefaultStringRedisConnection;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.StringRedisConnection;
import com.lambdaworks.redis.RedisAsyncConnection;
/**
* Integration test of {@link LettuceConnectionFactory}
*
* @author Jennifer Hickey
*
*/
public class LettuceConnectionFactoryTests {
private LettuceConnectionFactory factory;
private StringRedisConnection connection;
@Before
public void setUp() {
factory = new LettuceConnectionFactory(SettingsUtils.getHost(), SettingsUtils.getPort());
factory.afterPropertiesSet();
connection = new DefaultStringRedisConnection(factory.getConnection());
}
@After
public void tearDown() {
factory.destroy();
}
@SuppressWarnings("rawtypes")
@Test
public void testGetNewConnectionOnError() throws Exception {
factory.setValidateConnection(true);
connection.lPush("alist", "baz");
RedisAsyncConnection nativeConn = (RedisAsyncConnection) connection.getNativeConnection();
nativeConn.close();
// Give some time for async channel close
Thread.sleep(500);
connection.bLPop(1, "alist".getBytes());
try {
connection.get("test3");
fail("Expected exception using natively closed conn");
} catch (RedisSystemException e) {
// expected, shared conn is closed
}
DefaultStringRedisConnection conn2 = new DefaultStringRedisConnection(
factory.getConnection());
assertNotSame(nativeConn, conn2.getNativeConnection());
conn2.set("anotherkey", "anothervalue");
assertEquals("anothervalue", conn2.get("anotherkey"));
}
@SuppressWarnings("rawtypes")
@Test
public void testConnectionErrorNoValidate() throws Exception {
connection.lPush("ablist", "baz");
((RedisAsyncConnection) connection.getNativeConnection()).close();
// Give some time for async channel close
Thread.sleep(500);
DefaultStringRedisConnection conn2 = new DefaultStringRedisConnection(
factory.getConnection());
try {
conn2.set("anotherkey", "anothervalue");
fail("Expected exception using natively closed conn");
} catch (RedisSystemException e) {
// expected, as we are re-using the natively closed conn
}
}
@Test
public void testValidateNoError() {
factory.setValidateConnection(true);
RedisConnection conn2 = factory.getConnection();
assertSame(connection.getNativeConnection(), conn2.getNativeConnection());
}
@Test
public void testSelectDb() {
LettuceConnectionFactory factory2 = new LettuceConnectionFactory(SettingsUtils.getHost(),
SettingsUtils.getPort());
factory2.setDatabase(1);
factory2.afterPropertiesSet();
StringRedisConnection connection2 = new DefaultStringRedisConnection(
factory2.getConnection());
connection2.flushDb();
// put an item in database 0
connection.set("sometestkey", "sometestvalue");
try {
// there should still be nothing in database 1
assertEquals(Long.valueOf(0), connection2.dbSize());
} finally {
factory2.destroy();
}
}
}

View File

@@ -16,11 +16,18 @@
package org.springframework.data.redis.connection.lettuce;
import java.util.List;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.AbstractConnectionIntegrationTests;
import org.springframework.data.redis.connection.DefaultStringRedisConnection;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
/**
* Integration test of {@link LettuceConnection}
@@ -40,4 +47,86 @@ public class LettuceConnectionIntegrationTests extends AbstractConnectionIntegra
@Ignore("DATAREDIS-122 exec never returns null")
public void testUnwatch() throws Exception {
}
@Test
public void testMultiThreadsOneBlocking() throws Exception {
Thread th = new Thread(new Runnable() {
public void run() {
DefaultStringRedisConnection conn2 = new DefaultStringRedisConnection(
connectionFactory.getConnection());
conn2.openPipeline();
conn2.bLPop(3, "multilist");
conn2.closePipeline();
conn2.close();
}
});
th.start();
Thread.sleep(1000);
connection.set("heythere", "hi");
th.join();
assertEquals("hi", connection.get("heythere"));
}
@Test
public void testMultiConnectionsOneInTx() throws Exception {
connection.set("txs1", "rightnow");
connection.multi();
connection.set("txs1", "delay");
DefaultStringRedisConnection conn2 = new DefaultStringRedisConnection(
connectionFactory.getConnection());
// We get immediate results executing command in separate conn (not part
// of tx)
conn2.set("txs2", "hi");
assertEquals("hi", conn2.get("txs2"));
// Transactional value not yet set
assertEquals("rightnow", conn2.get("txs1"));
connection.exec();
// Now it should be set
assertEquals("delay", conn2.get("txs1"));
}
@Test
public void testCloseInTransaction() {
connection.multi();
connection.close();
try {
connection.exec();
fail("Expected exception resuming tx");
} catch (RedisSystemException e) {
// expected, can't resume tx after closing conn
}
// can do normal ops after closing
connection.get("txclose");
// can complete a new tx after closing
connection.multi();
connection.set("txclose", "bar");
List<Object> results = connection.exec();
assertEquals("OK", results.get(0));
}
@Test
public void testCloseBlockingOps() {
connection.lPush("what", "baz");
connection.bLPop(1, "what".getBytes());
connection.close();
// can do blocking ops after closing
connection.lPush("what", "boo");
connection.bLPop(1, "what".getBytes());
// we can do regular ops
connection.get("somekey");
// we can start a tx
connection.multi();
}
@Test(expected = UnsupportedOperationException.class)
public void testSelect() {
connection.select(1);
}
}

View File

@@ -16,10 +16,10 @@
package org.springframework.data.redis.connection.lettuce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import java.util.ArrayList;
import java.util.Arrays;
@@ -34,6 +34,7 @@ import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.data.redis.connection.AbstractConnectionPipelineIntegrationTests;
import org.springframework.data.redis.connection.DefaultStringRedisConnection;
import org.springframework.data.redis.connection.DefaultStringTuple;
import org.springframework.data.redis.connection.StringRedisConnection.StringTuple;
import org.springframework.test.annotation.IfProfileValue;
@@ -69,6 +70,47 @@ public class LettuceConnectionPipelineIntegrationTests extends
// Overrides, usually due to return values being Long vs Boolean or Set vs
// List
@Test
public void testBLPop() {
// Use a different synchronous connection to add items to the list, else blpop may
// execute before items in the list (LettuceConnection uses different underlying conns for blocking ops)
DefaultStringRedisConnection conn2 = new DefaultStringRedisConnection(
connectionFactory.getConnection());
conn2.lPush("poplist", "foo");
conn2.lPush("poplist", "bar");
actual.add(connection.bLPop(1, "poplist", "otherlist"));
verifyResults(Arrays.asList(new Object[] {Arrays.asList(new String[] {"poplist", "bar"})}), actual);
}
@Test
public void testBRPop() {
// Use a different synchronous connection to add items to the list, else blpop may
// execute before items in the list (LettuceConnection uses different underlying conns for blocking ops)
DefaultStringRedisConnection conn2 = new DefaultStringRedisConnection(
connectionFactory.getConnection());
conn2.rPush("rpoplist", "bar");
conn2.rPush("rpoplist", "foo");
actual.add(connection.bRPop(1, "rpoplist"));
verifyResults(Arrays.asList(new Object[] {Arrays.asList(new String[] {"rpoplist", "foo"})}), actual);
}
@Test
public void testBRPopLPush() {
// Use a different synchronous connection to add items to the list, else blpop may
// execute before items in the list (LettuceConnection uses different underlying conns for blocking ops)
DefaultStringRedisConnection conn2 = new DefaultStringRedisConnection(
connectionFactory.getConnection());
conn2.rPush("PopList", "hello");
conn2.rPush("PopList", "world");
conn2.rPush("pop2", "hey");
assertNull(connection.bRPopLPush(1, "PopList", "pop2"));
List<Object> results = convertResults();
assertEquals(Arrays.asList(new String[] { "world" }), results);
assertEquals(Arrays.asList(new String[] { "hello" }), connection.lRange("PopList", 0, -1));
assertEquals(Arrays.asList(new String[] { "world", "hey" }),
connection.lRange("pop2", 0, -1));
}
@Test
public void testHSetGet() throws Exception {
String hash = getClass() + ":hashtest";