From 7c96b80e54b45be2b6894ab3dc9cd931667e9a9b Mon Sep 17 00:00:00 2001 From: Jennifer Hickey Date: Mon, 15 Apr 2013 14:27:28 -0700 Subject: [PATCH] Make LettuceConnectionFactory thread-safe --- .../lettuce/LettuceConnectionFactory.java | 61 +++++++++++++++---- .../LettuceConnectionFactoryTests.java | 28 +++++++++ 2 files changed, 77 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java index 4c6bd5f23..5c2e59dba 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java @@ -60,6 +60,8 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea private boolean shareNativeConnection = true; private RedisAsyncConnection connection; private int dbIndex = 0; + /** Synchronization monitor for the shared Connection */ + private final Object connectionMonitor = new Object(); /** * Constructs a new LettuceConnectionFactory instance with @@ -80,9 +82,13 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea public void afterPropertiesSet() { client = new RedisClient(hostName, port); client.setDefaultTimeout(timeout, TimeUnit.MILLISECONDS); + if (shareNativeConnection) { + initConnection(); + } } public void destroy() { + resetConnection(); client.shutdown(); } @@ -94,6 +100,40 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea return new LettuceConnection(nativeConnection, timeout, client, !shareNativeConnection); } + public void initConnection() { + synchronized (this.connectionMonitor) { + if (this.connection != null) { + resetConnection(); + } + this.connection = client.connectAsync(LettuceUtils.CODEC); + } + } + + /** + * Reset the underlying shared Connection, to be reinitialized on next + * access. + */ + public void resetConnection() { + synchronized (this.connectionMonitor) { + connection.close(); + this.connection = null; + } + } + + /** + * Validate the shared Connection and reinitialize if invalid + */ + public void validateConnection() { + synchronized (this.connectionMonitor) { + try { + new com.lambdaworks.redis.RedisConnection(connection).ping(); + } catch (RedisException e) { + log.warn("Validation of shared connection failed. Creating a new connection."); + initConnection(); + } + } + } + public DataAccessException translateExceptionIfPossible(RuntimeException ex) { return LettuceUtils.convertRedisAccessException(ex); } @@ -229,20 +269,17 @@ public class LettuceConnectionFactory implements InitializingBean, DisposableBea } protected RedisAsyncConnection getNativeConnection() { - if(shareNativeConnection) { - if(connection == null) { - connection = client.connectAsync(LettuceUtils.CODEC); - } - if (validateConnection) { - try { - new com.lambdaworks.redis.RedisConnection(connection).ping(); - } catch (RedisException e) { - log.warn("Validation of shared connection failed. Creating a new connection."); - connection = client.connectAsync(LettuceUtils.CODEC); + if (shareNativeConnection) { + synchronized (this.connectionMonitor) { + if (this.connection == null) { + initConnection(); } + if (validateConnection) { + validateConnection(); + } + return this.connection; } - return connection; - }else { + } else { return client.connectAsync(LettuceUtils.CODEC); } } diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java index 8687550e2..55b896d1c 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java @@ -141,4 +141,32 @@ public class LettuceConnectionFactoryTests { // expected } } + + @SuppressWarnings("unchecked") + @Test + public void testResetConnection() { + RedisAsyncConnection nativeConn = (RedisAsyncConnection) connection + .getNativeConnection(); + factory.resetConnection(); + assertNotSame(nativeConn, factory.getConnection().getNativeConnection()); + } + + @SuppressWarnings("unchecked") + @Test + public void testInitConnection() { + RedisAsyncConnection nativeConn = (RedisAsyncConnection) connection + .getNativeConnection(); + factory.initConnection(); + assertNotSame(nativeConn, factory.getConnection().getNativeConnection()); + } + + @SuppressWarnings("unchecked") + @Test + public void testResetAndInitConnection() { + RedisAsyncConnection nativeConn = (RedisAsyncConnection) connection + .getNativeConnection(); + factory.resetConnection(); + factory.initConnection(); + assertNotSame(nativeConn, factory.getConnection().getNativeConnection()); + } }