Make LettuceConnectionFactory thread-safe
This commit is contained in:
@@ -60,6 +60,8 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea
|
||||
private boolean shareNativeConnection = true;
|
||||
private RedisAsyncConnection<byte[], byte[]> connection;
|
||||
private int dbIndex = 0;
|
||||
/** Synchronization monitor for the shared Connection */
|
||||
private final Object connectionMonitor = new Object();
|
||||
|
||||
/**
|
||||
* Constructs a new <code>LettuceConnectionFactory</code> instance with
|
||||
@@ -80,9 +82,13 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea
|
||||
public void afterPropertiesSet() {
|
||||
client = new RedisClient(hostName, port);
|
||||
client.setDefaultTimeout(timeout, TimeUnit.MILLISECONDS);
|
||||
if (shareNativeConnection) {
|
||||
initConnection();
|
||||
}
|
||||
}
|
||||
|
||||
public void destroy() {
|
||||
resetConnection();
|
||||
client.shutdown();
|
||||
}
|
||||
|
||||
@@ -94,6 +100,40 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea
|
||||
return new LettuceConnection(nativeConnection, timeout, client, !shareNativeConnection);
|
||||
}
|
||||
|
||||
public void initConnection() {
|
||||
synchronized (this.connectionMonitor) {
|
||||
if (this.connection != null) {
|
||||
resetConnection();
|
||||
}
|
||||
this.connection = client.connectAsync(LettuceUtils.CODEC);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the underlying shared Connection, to be reinitialized on next
|
||||
* access.
|
||||
*/
|
||||
public void resetConnection() {
|
||||
synchronized (this.connectionMonitor) {
|
||||
connection.close();
|
||||
this.connection = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate the shared Connection and reinitialize if invalid
|
||||
*/
|
||||
public void validateConnection() {
|
||||
synchronized (this.connectionMonitor) {
|
||||
try {
|
||||
new com.lambdaworks.redis.RedisConnection<byte[], byte[]>(connection).ping();
|
||||
} catch (RedisException e) {
|
||||
log.warn("Validation of shared connection failed. Creating a new connection.");
|
||||
initConnection();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
|
||||
return LettuceUtils.convertRedisAccessException(ex);
|
||||
}
|
||||
@@ -229,20 +269,17 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea
|
||||
}
|
||||
|
||||
protected RedisAsyncConnection<byte[], byte[]> getNativeConnection() {
|
||||
if(shareNativeConnection) {
|
||||
if(connection == null) {
|
||||
connection = client.connectAsync(LettuceUtils.CODEC);
|
||||
}
|
||||
if (validateConnection) {
|
||||
try {
|
||||
new com.lambdaworks.redis.RedisConnection<byte[], byte[]>(connection).ping();
|
||||
} catch (RedisException e) {
|
||||
log.warn("Validation of shared connection failed. Creating a new connection.");
|
||||
connection = client.connectAsync(LettuceUtils.CODEC);
|
||||
if (shareNativeConnection) {
|
||||
synchronized (this.connectionMonitor) {
|
||||
if (this.connection == null) {
|
||||
initConnection();
|
||||
}
|
||||
if (validateConnection) {
|
||||
validateConnection();
|
||||
}
|
||||
return this.connection;
|
||||
}
|
||||
return connection;
|
||||
}else {
|
||||
} else {
|
||||
return client.connectAsync(LettuceUtils.CODEC);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -141,4 +141,32 @@ public class LettuceConnectionFactoryTests {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testResetConnection() {
|
||||
RedisAsyncConnection<byte[], byte[]> nativeConn = (RedisAsyncConnection<byte[], byte[]>) connection
|
||||
.getNativeConnection();
|
||||
factory.resetConnection();
|
||||
assertNotSame(nativeConn, factory.getConnection().getNativeConnection());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testInitConnection() {
|
||||
RedisAsyncConnection<byte[], byte[]> nativeConn = (RedisAsyncConnection<byte[], byte[]>) connection
|
||||
.getNativeConnection();
|
||||
factory.initConnection();
|
||||
assertNotSame(nativeConn, factory.getConnection().getNativeConnection());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testResetAndInitConnection() {
|
||||
RedisAsyncConnection<byte[], byte[]> nativeConn = (RedisAsyncConnection<byte[], byte[]>) connection
|
||||
.getNativeConnection();
|
||||
factory.resetConnection();
|
||||
factory.initConnection();
|
||||
assertNotSame(nativeConn, factory.getConnection().getNativeConnection());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user