+ add MULTI awareness to RedisConnection

This commit is contained in:
Costin Leau
2010-11-03 19:14:19 +02:00
parent 20dc9566ad
commit 27b1bf8af5
7 changed files with 127 additions and 29 deletions

View File

@@ -20,6 +20,8 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;
import com.sun.xml.internal.bind.v2.TODO;
/**
* Base class for {@link RedisTemplate} and
* other Redis-accessing DAO helpers, defining common properties such as
@@ -64,5 +66,4 @@ public class RedisAccessor implements InitializingBean {
public void afterPropertiesSet() {
Assert.notNull(getRedisClientFactory(), "RedisClientfactory is required");
}
}
}

View File

@@ -25,9 +25,9 @@ import java.util.Collection;
*/
public interface RedisCommands {
boolean exists(String key);
Boolean exists(String key);
int del(String... keys);
Integer del(String... keys);
DataType type(String key);
@@ -36,17 +36,17 @@ public interface RedisCommands {
String randomKey();
//TODO see whether the status code can be properly intercepted
boolean rename(String oldName, String newName);
Boolean rename(String oldName, String newName);
boolean renameNx(String oldName, String newName);
Boolean renameNx(String oldName, String newName);
int dbSize();
Integer dbSize();
boolean expire(String key, long seconds);
Boolean expire(String key, int seconds);
boolean persist(String key);
Boolean persist(String key);
int ttl(String key);
Integer ttl(String key);
void select(int dbIndex);

View File

@@ -39,4 +39,15 @@ public interface RedisConnection<T> extends RedisCommands, RedisHashCommands, Re
T getNativeConnection();
String getCharset();
/**
* Indicates whether the connection is in "queue"(or "MULTI") mode or not.
* When queueing, all commands are postponed until EXEC or DISCARD commands
* are issued.
* Since in queueing, no results are returned, the connection will return NULL
* on all operations that interact with the data.
*
* @return true if the connection is in queue/MULTI mode, false otherwise
*/
boolean isQueueing();
}

View File

@@ -23,5 +23,5 @@ package org.springframework.datastore.redis.core.connection;
*/
public interface RedisHashCommands {
int hSet(String key, String field, String value);
Integer hSet(String key, String field, String value);
}

View File

@@ -23,7 +23,7 @@ package org.springframework.datastore.redis.core.connection;
*/
public interface RedisListCommands {
int rPush(String key, String value);
Integer rPush(String key, String value);
int lPush(String key, String value);
Integer lPush(String key, String value);
}

View File

@@ -29,6 +29,7 @@ import org.springframework.util.ReflectionUtils;
import redis.clients.jedis.Client;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisException;
import redis.clients.jedis.Transaction;
/**
* Jedis based {@link RedisConnection}.
@@ -46,11 +47,13 @@ public class JedisConnection implements RedisConnection<Jedis> {
private final Jedis jedis;
private final Client client;
private final Transaction transaction;
public JedisConnection(Jedis jedis) {
this.jedis = jedis;
// extract underlying client for batch operations
client = (Client) ReflectionUtils.getField(CLIENT_FIELD, jedis);
transaction = new Transaction(client);
}
protected DataAccessException convertJedisAccessException(Exception ex) {
@@ -67,8 +70,12 @@ public class JedisConnection implements RedisConnection<Jedis> {
@Override
public void close() throws UncategorizedRedisException {
try {
jedis.disconnect();
if (isQueueing()) {
client.quit();
client.disconnect();
}
jedis.quit();
jedis.disconnect();
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
@@ -94,8 +101,17 @@ public class JedisConnection implements RedisConnection<Jedis> {
}
@Override
public int dbSize() {
public boolean isQueueing() {
return client.isInMulti();
}
@Override
public Integer dbSize() {
try {
if (isQueueing()) {
transaction.dbSize();
return null;
}
return jedis.dbSize();
} catch (Exception ex) {
throw convertJedisAccessException(ex);
@@ -103,8 +119,12 @@ public class JedisConnection implements RedisConnection<Jedis> {
}
@Override
public int del(String... keys) {
public Integer del(String... keys) {
try {
if (isQueueing()) {
transaction.del(keys);
return null;
}
return jedis.del(keys);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
@@ -130,8 +150,12 @@ public class JedisConnection implements RedisConnection<Jedis> {
}
@Override
public boolean exists(String key) {
public Boolean exists(String key) {
try {
if (isQueueing()) {
transaction.exists(key);
return null;
}
return (jedis.exists(key) == 1);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
@@ -139,8 +163,12 @@ public class JedisConnection implements RedisConnection<Jedis> {
}
@Override
public boolean expire(String key, long seconds) {
public Boolean expire(String key, int seconds) {
try {
if (isQueueing()) {
transaction.expire(key, seconds);
return null;
}
return (jedis.expire(key, (int) seconds) == 1);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
@@ -150,6 +178,10 @@ public class JedisConnection implements RedisConnection<Jedis> {
@Override
public Collection<String> keys(String pattern) {
try {
if (isQueueing()) {
transaction.keys(pattern);
return null;
}
return (jedis.keys(pattern));
} catch (Exception ex) {
throw convertJedisAccessException(ex);
@@ -159,15 +191,19 @@ public class JedisConnection implements RedisConnection<Jedis> {
@Override
public void multi() {
try {
jedis.multi();
client.multi();
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
}
@Override
public boolean persist(String key) {
public Boolean persist(String key) {
try {
if (isQueueing()) {
client.persist(key);
return null;
}
return (jedis.persist(key) == 1);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
@@ -177,6 +213,10 @@ public class JedisConnection implements RedisConnection<Jedis> {
@Override
public String randomKey() {
try {
if (isQueueing()) {
transaction.randomKey();
return null;
}
return jedis.randomKey();
} catch (Exception ex) {
throw convertJedisAccessException(ex);
@@ -184,18 +224,26 @@ public class JedisConnection implements RedisConnection<Jedis> {
}
@Override
public boolean rename(String oldName, String newName) {
public Boolean rename(String oldName, String newName) {
try {
return (JedisUtils.OK_CODE.equals(jedis.rename(oldName, newName)));
if (isQueueing()) {
transaction.rename(oldName, newName);
return null;
}
return (JedisUtils.isStatusOk(jedis.rename(oldName, newName)));
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
}
@Override
public boolean renameNx(String oldName, String newName) {
public Boolean renameNx(String oldName, String newName) {
try {
return (JedisUtils.OK_CODE.equals(jedis.renamenx(oldName, newName)));
if (isQueueing()) {
transaction.renamenx(oldName, newName);
return null;
}
return (jedis.renamenx(oldName, newName) == 1);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
@@ -204,6 +252,9 @@ public class JedisConnection implements RedisConnection<Jedis> {
@Override
public void select(int dbIndex) {
try {
if (isQueueing()) {
transaction.select(dbIndex);
}
jedis.select(dbIndex);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
@@ -211,8 +262,12 @@ public class JedisConnection implements RedisConnection<Jedis> {
}
@Override
public int ttl(String key) {
public Integer ttl(String key) {
try {
if (isQueueing()) {
transaction.ttl(key);
return null;
}
return jedis.ttl(key);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
@@ -222,6 +277,10 @@ public class JedisConnection implements RedisConnection<Jedis> {
@Override
public DataType type(String key) {
try {
if (isQueueing()) {
transaction.type(key);
return null;
}
return DataType.fromCode(jedis.type(key));
} catch (Exception ex) {
throw convertJedisAccessException(ex);
@@ -239,6 +298,11 @@ public class JedisConnection implements RedisConnection<Jedis> {
@Override
public void watch(String... keys) {
if (isQueueing()) {
// ignore (as watch not allowed in multi)
return;
}
try {
for (String key : keys) {
jedis.watch(key);
@@ -249,8 +313,12 @@ public class JedisConnection implements RedisConnection<Jedis> {
}
@Override
public int hSet(String key, String field, String value) {
public Integer hSet(String key, String field, String value) {
try {
if (isQueueing()) {
transaction.hset(key, field, value);
return null;
}
return jedis.hset(key, field, value);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
@@ -258,8 +326,12 @@ public class JedisConnection implements RedisConnection<Jedis> {
}
@Override
public int lPush(String key, String value) {
public Integer lPush(String key, String value) {
try {
if (isQueueing()) {
transaction.lpush(key, value);
return null;
}
return jedis.lpush(key, value);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
@@ -267,8 +339,12 @@ public class JedisConnection implements RedisConnection<Jedis> {
}
@Override
public int rPush(String key, String value) {
public Integer rPush(String key, String value) {
try {
if (isQueueing()) {
transaction.rpush(key, value);
return null;
}
return jedis.rpush(key, value);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
@@ -278,6 +354,11 @@ public class JedisConnection implements RedisConnection<Jedis> {
@Override
public String get(String key) {
try {
if (isQueueing()) {
transaction.get(key);
return null;
}
return jedis.get(key);
} catch (Exception ex) {
throw convertJedisAccessException(ex);

View File

@@ -34,7 +34,8 @@ import redis.clients.jedis.JedisException;
*/
public abstract class JedisUtils {
public static final String OK_CODE = "OK";
private static final String OK_CODE = "OK";
private static final String OK_MULTI_CODE = "+OK";
public static DataAccessException convertJedisAccessException(JedisException ex) {
return new InvalidDataAccessApiUsageException(ex.getMessage(), ex);
@@ -58,4 +59,8 @@ public abstract class JedisUtils {
static DataAccessException convertJedisAccessException(TimeoutException ex) {
throw new RedisConnectionFailureException("Jedis pool timed out. Could not get Redis Connection", ex);
}
static boolean isStatusOk(String status) {
return status != null && (OK_CODE.equals(status) || OK_MULTI_CODE.equals(status));
}
}