improve handling of staleness exception during connection init

DATAREDIS-109
This commit is contained in:
Costin Leau
2012-11-29 17:06:53 +02:00
parent 3f7d1b8baa
commit d22684dfda
3 changed files with 80 additions and 76 deletions

View File

@@ -114,8 +114,15 @@ public class JedisConnection implements RedisConnection {
this.dbIndex = dbIndex;
// select the db
// if this fail, do manual clean-up before propagating the exception
// as we're inside the constructor
if (dbIndex > 0) {
select(dbIndex);
try {
select(dbIndex);
} catch (DataAccessException ex) {
close();
throw ex;
}
}
}

View File

@@ -21,7 +21,7 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.util.Assert;
@@ -98,7 +98,7 @@ public class JedisConnectionFactory implements InitializingBean, DisposableBean,
jedis.connect();
return jedis;
} catch (Exception ex) {
throw new DataAccessResourceFailureException("Cannot get Jedis connection", ex);
throw new RedisConnectionFailureException("Cannot get Jedis connection", ex);
}
}
@@ -146,11 +146,10 @@ public class JedisConnectionFactory implements InitializingBean, DisposableBean,
public JedisConnection getConnection() {
Jedis jedis = fetchJedisConnector();
return postProcessConnection((usePool ? new JedisConnection(jedis, pool, dbIndex) : new JedisConnection(jedis,
null, dbIndex)));
return postProcessConnection((usePool ? new JedisConnection(jedis, pool, dbIndex) : new JedisConnection(jedis, null, dbIndex)));
}
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
return JedisUtils.convertJedisAccessException(ex);
}

View File

@@ -87,7 +87,7 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
public RedisTemplate() {
}
public void afterPropertiesSet() {
super.afterPropertiesSet();
boolean defaultUsed = false;
@@ -116,7 +116,7 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
}
}
public <T> T execute(RedisCallback<T> action) {
return execute(action, isExposeConnection());
}
@@ -147,17 +147,18 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
Assert.notNull(action, "Callback object must not be null");
RedisConnectionFactory factory = getConnectionFactory();
RedisConnection conn = RedisConnectionUtils.getConnection(factory);
boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
preProcessConnection(conn, existingConnection);
boolean pipelineStatus = conn.isPipelined();
if (pipeline && !pipelineStatus) {
conn.openPipeline();
}
RedisConnection conn = null;
try {
conn = RedisConnectionUtils.getConnection(factory);
boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
preProcessConnection(conn, existingConnection);
boolean pipelineStatus = conn.isPipelined();
if (pipeline && !pipelineStatus) {
conn.openPipeline();
}
RedisConnection connToExpose = (exposeConnection ? conn : createRedisConnectionProxy(conn));
T result = action.doInRedis(connToExpose);
@@ -174,7 +175,7 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
}
public <T> T execute(SessionCallback<T> session) {
RedisConnectionFactory factory = getConnectionFactory();
// bind connection
@@ -407,23 +408,23 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
//
// RedisOperations
//
public List<Object> exec() {
return execute(new RedisCallback<List<Object>>() {
public List<Object> doInRedis(RedisConnection connection) throws DataAccessException {
return connection.exec();
}
});
}
public void delete(K key) {
final byte[] rawKey = rawKey(key);
execute(new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection) {
connection.del(rawKey);
return null;
@@ -431,12 +432,12 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
}, true);
}
public void delete(Collection<K> keys) {
final byte[][] rawKeys = rawKeys(keys);
execute(new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection) {
connection.del(rawKeys);
return null;
@@ -444,45 +445,45 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
}, true);
}
public Boolean hasKey(K key) {
final byte[] rawKey = rawKey(key);
return execute(new RedisCallback<Boolean>() {
public Boolean doInRedis(RedisConnection connection) {
return connection.exists(rawKey);
}
}, true);
}
public Boolean expire(K key, long timeout, TimeUnit unit) {
final byte[] rawKey = rawKey(key);
final long rawTimeout = unit.toSeconds(timeout);
return execute(new RedisCallback<Boolean>() {
public Boolean doInRedis(RedisConnection connection) {
return connection.expire(rawKey, rawTimeout);
}
}, true);
}
public Boolean expireAt(K key, Date date) {
final byte[] rawKey = rawKey(key);
final long rawTimeout = date.getTime() / 1000;
return execute(new RedisCallback<Boolean>() {
public Boolean doInRedis(RedisConnection connection) {
return connection.expireAt(rawKey, rawTimeout);
}
}, true);
}
public void convertAndSend(String channel, Object message) {
Assert.hasText(channel, "a non-empty channel is required");
@@ -490,7 +491,7 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
final byte[] rawMessage = rawValue(message);
execute(new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection) {
connection.publish(rawChannel, rawMessage);
return null;
@@ -503,12 +504,12 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
// Value operations
//
public Long getExpire(K key) {
final byte[] rawKey = rawKey(key);
return execute(new RedisCallback<Long>() {
public Long doInRedis(RedisConnection connection) {
return Long.valueOf(connection.ttl(rawKey));
}
@@ -516,12 +517,11 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
}
@SuppressWarnings("unchecked")
public Set<K> keys(K pattern) {
final byte[] rawKey = rawKey(pattern);
Set<byte[]> rawKeys = execute(new RedisCallback<Set<byte[]>>() {
public Set<byte[]> doInRedis(RedisConnection connection) {
return connection.keys(rawKey);
}
@@ -530,34 +530,34 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
return SerializationUtils.deserialize(rawKeys, keySerializer);
}
public Boolean persist(K key) {
final byte[] rawKey = rawKey(key);
return execute(new RedisCallback<Boolean>() {
public Boolean doInRedis(RedisConnection connection) {
return connection.persist(rawKey);
}
}, true);
}
public Boolean move(K key, final int dbIndex) {
final byte[] rawKey = rawKey(key);
return execute(new RedisCallback<Boolean>() {
public Boolean doInRedis(RedisConnection connection) {
return connection.move(rawKey, dbIndex);
}
}, true);
}
public K randomKey() {
byte[] rawKey = execute(new RedisCallback<byte[]>() {
public byte[] doInRedis(RedisConnection connection) {
return connection.randomKey();
}
@@ -566,13 +566,13 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
return deserializeKey(rawKey);
}
public void rename(K oldKey, K newKey) {
final byte[] rawOldKey = rawKey(oldKey);
final byte[] rawNewKey = rawKey(newKey);
execute(new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection) {
connection.rename(rawOldKey, rawNewKey);
return null;
@@ -580,35 +580,35 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
}, true);
}
public Boolean renameIfAbsent(K oldKey, K newKey) {
final byte[] rawOldKey = rawKey(oldKey);
final byte[] rawNewKey = rawKey(newKey);
return execute(new RedisCallback<Boolean>() {
public Boolean doInRedis(RedisConnection connection) {
return connection.renameNX(rawOldKey, rawNewKey);
}
}, true);
}
public DataType type(K key) {
final byte[] rawKey = rawKey(key);
return execute(new RedisCallback<DataType>() {
public DataType doInRedis(RedisConnection connection) {
return connection.type(rawKey);
}
}, true);
}
public void multi() {
execute(new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection) throws DataAccessException {
connection.multi();
return null;
@@ -616,11 +616,11 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
}, true);
}
public void discard() {
execute(new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection) throws DataAccessException {
connection.discard();
return null;
@@ -628,12 +628,12 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
}, true);
}
public void watch(K key) {
final byte[] rawKey = rawKey(key);
execute(new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection) {
connection.watch(rawKey);
return null;
@@ -641,12 +641,12 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
}, true);
}
public void watch(Collection<K> keys) {
final byte[][] rawKeys = rawKeys(keys);
execute(new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection) {
connection.watch(rawKeys);
return null;
@@ -654,10 +654,10 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
}, true);
}
public void unwatch() {
execute(new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection) throws DataAccessException {
connection.unwatch();
return null;
@@ -668,18 +668,17 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
// Sort operations
@SuppressWarnings("unchecked")
public List<V> sort(SortQuery<K> query) {
return sort(query, valueSerializer);
}
public <T> List<T> sort(SortQuery<K> query, RedisSerializer<T> resultSerializer) {
final byte[] rawKey = rawKey(query.getKey());
final SortParameters params = QueryUtils.convertQuery(query, stringSerializer);
List<byte[]> vals = execute(new RedisCallback<List<byte[]>>() {
public List<byte[]> doInRedis(RedisConnection connection) throws DataAccessException {
return connection.sort(rawKey, params);
}
@@ -689,12 +688,11 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
}
@SuppressWarnings("unchecked")
public <T> List<T> sort(SortQuery<K> query, BulkMapper<T, V> bulkMapper) {
return sort(query, bulkMapper, valueSerializer);
}
public <T, S> List<T> sort(SortQuery<K> query, BulkMapper<T, S> bulkMapper, RedisSerializer<S> resultSerializer) {
List<S> values = sort(query, resultSerializer);
@@ -719,26 +717,26 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
return result;
}
public Long sort(SortQuery<K> query, K storeKey) {
final byte[] rawStoreKey = rawKey(storeKey);
final byte[] rawKey = rawKey(query.getKey());
final SortParameters params = QueryUtils.convertQuery(query, stringSerializer);
return execute(new RedisCallback<Long>() {
public Long doInRedis(RedisConnection connection) throws DataAccessException {
return connection.sort(rawKey, params, rawStoreKey);
}
}, true);
}
public BoundValueOperations<K, V> boundValueOps(K key) {
return new DefaultBoundValueOperations<K, V>(key, this);
}
public ValueOperations<K, V> opsForValue() {
if (valueOps == null) {
valueOps = new DefaultValueOperations<K, V>(this);
@@ -746,7 +744,7 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
return valueOps;
}
public ListOperations<K, V> opsForList() {
if (listOps == null) {
listOps = new DefaultListOperations<K, V>(this);
@@ -754,17 +752,17 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
return listOps;
}
public BoundListOperations<K, V> boundListOps(K key) {
return new DefaultBoundListOperations<K, V>(key, this);
}
public BoundSetOperations<K, V> boundSetOps(K key) {
return new DefaultBoundSetOperations<K, V>(key, this);
}
public SetOperations<K, V> opsForSet() {
if (setOps == null) {
setOps = new DefaultSetOperations<K, V>(this);
@@ -772,12 +770,12 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
return setOps;
}
public BoundZSetOperations<K, V> boundZSetOps(K key) {
return new DefaultBoundZSetOperations<K, V>(key, this);
}
public ZSetOperations<K, V> opsForZSet() {
if (zSetOps == null) {
zSetOps = new DefaultZSetOperations<K, V>(this);
@@ -785,12 +783,12 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
return zSetOps;
}
public <HK, HV> BoundHashOperations<K, HK, HV> boundHashOps(K key) {
return new DefaultBoundHashOperations<K, HK, HV>(key, this);
}
public <HK, HV> HashOperations<K, HK, HV> opsForHash() {
return new DefaultHashOperations<K, HK, HV>(this);
}