+ beefed up Redis template
+ added support for thread/transaction bound support
This commit is contained in:
@@ -52,4 +52,8 @@ public class MyRedisAccessor implements InitializingBean {
|
||||
public void setConnectionFactory(RedisConnectionFactory connectionFactory) {
|
||||
this.connectionFactory = connectionFactory;
|
||||
}
|
||||
}
|
||||
|
||||
public RuntimeException tryToConvertRedisAccessException(Exception ex) {
|
||||
throw new UnsupportedOperationException("wire this into dialects/XXXClient utils");
|
||||
}
|
||||
}
|
||||
@@ -15,8 +15,17 @@
|
||||
*/
|
||||
package org.springframework.datastore.redis.core;
|
||||
|
||||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Proxy;
|
||||
|
||||
import org.springframework.datastore.redis.core.connection.RedisConnection;
|
||||
import org.springframework.datastore.redis.core.connection.RedisConnectionFactory;
|
||||
import org.springframework.datastore.redis.support.converter.RedisConverter;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
/**
|
||||
*
|
||||
@@ -36,6 +45,7 @@ import org.springframework.datastore.redis.core.connection.RedisConnectionFactor
|
||||
public class MyRedisTemplate extends MyRedisAccessor {
|
||||
|
||||
private boolean exposeConnection = false;
|
||||
private RedisConverter converter = null;
|
||||
|
||||
public MyRedisTemplate() {
|
||||
}
|
||||
@@ -46,7 +56,48 @@ public class MyRedisTemplate extends MyRedisAccessor {
|
||||
}
|
||||
|
||||
public <T> T execute(MyRedisCallback<T> action) {
|
||||
throw new UnsupportedOperationException();
|
||||
return execute(action, isExposeConnection());
|
||||
}
|
||||
|
||||
|
||||
public <T> T execute(MyRedisCallback<T> action, boolean exposeConnection) {
|
||||
Assert.notNull(action, "Callback object must not be null");
|
||||
|
||||
RedisConnectionFactory factory = getConnectionFactory();
|
||||
RedisConnection<?> conn = RedisConnectionUtils.getRedisConnection(factory);
|
||||
|
||||
boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
|
||||
|
||||
try {
|
||||
RedisConnection<?> connToExpose = (exposeConnection ? conn : createRedisConnectionProxy(conn));
|
||||
T result = action.doInRedis(connToExpose);
|
||||
// TODO: should do flush?
|
||||
return postProcessResult(result, conn, existingConnection);
|
||||
} catch (Exception ex) {
|
||||
// TODO: too generic ?
|
||||
throw tryToConvertRedisAccessException(ex);
|
||||
} finally {
|
||||
RedisConnectionUtils.releaseConnection(conn, factory);
|
||||
}
|
||||
}
|
||||
|
||||
protected RedisConnection<?> createRedisConnectionProxy(RedisConnection<?> pm) {
|
||||
Class<?>[] ifcs = ClassUtils.getAllInterfacesForClass(pm.getClass(), getClass().getClassLoader());
|
||||
return (RedisConnection<?>) Proxy.newProxyInstance(pm.getClass().getClassLoader(), ifcs,
|
||||
new CloseSuppressingInvocationHandler(pm));
|
||||
}
|
||||
|
||||
protected <T> T postProcessResult(T result, RedisConnection<?> pm, boolean existingConnection) {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the exposeConnection.
|
||||
*
|
||||
* @return Returns the exposeConnection
|
||||
*/
|
||||
public boolean isExposeConnection() {
|
||||
return exposeConnection;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -59,4 +110,47 @@ public class MyRedisTemplate extends MyRedisAccessor {
|
||||
public void setExposeConnection(boolean exposeConnection) {
|
||||
this.exposeConnection = exposeConnection;
|
||||
}
|
||||
}
|
||||
|
||||
public void setRedisConverter(RedisConverter converter) {
|
||||
this.converter = converter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Invocation handler that suppresses close calls on JDO PersistenceManagers.
|
||||
* Also prepares returned Query objects.
|
||||
* @see RedisConnection#close()
|
||||
*/
|
||||
private class CloseSuppressingInvocationHandler implements InvocationHandler {
|
||||
|
||||
private final RedisConnection<?> target;
|
||||
|
||||
public CloseSuppressingInvocationHandler(RedisConnection<?> target) {
|
||||
this.target = target;
|
||||
}
|
||||
|
||||
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
|
||||
// Invocation on PersistenceManager interface (or provider-specific extension) coming in...
|
||||
|
||||
if (method.getName().equals("equals")) {
|
||||
// Only consider equal when proxies are identical.
|
||||
return (proxy == args[0]);
|
||||
}
|
||||
else if (method.getName().equals("hashCode")) {
|
||||
// Use hashCode of PersistenceManager proxy.
|
||||
return System.identityHashCode(proxy);
|
||||
}
|
||||
else if (method.getName().equals("close")) {
|
||||
// Handle close method: suppress, not valid.
|
||||
return null;
|
||||
}
|
||||
|
||||
// Invoke method on target RedisConnection.
|
||||
try {
|
||||
Object retVal = method.invoke(this.target, args);
|
||||
return retVal;
|
||||
} catch (InvocationTargetException ex) {
|
||||
throw ex.getTargetException();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,134 @@
|
||||
/*
|
||||
* Copyright 2006-2009 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.datastore.redis.core;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.datastore.redis.core.connection.RedisConnection;
|
||||
import org.springframework.datastore.redis.core.connection.RedisConnectionFactory;
|
||||
import org.springframework.transaction.support.ResourceHolder;
|
||||
import org.springframework.transaction.support.ResourceHolderSynchronization;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* Helper class featuring {@link RedisConnection} handling, allowing for reuse of instances within transactions.
|
||||
*
|
||||
* @author Costin Leau
|
||||
*/
|
||||
public abstract class RedisConnectionUtils {
|
||||
|
||||
private static final Log log = LogFactory.getLog(RedisConnectionUtils.class);
|
||||
|
||||
public static RedisConnection<?> getRedisConnection(RedisConnectionFactory factory) {
|
||||
return doGetRedisConnection(factory, true);
|
||||
}
|
||||
|
||||
public static RedisConnection<?> doGetRedisConnection(RedisConnectionFactory factory, boolean allowCreate) {
|
||||
Assert.notNull(factory, "No RedisConnectionFactory specified");
|
||||
|
||||
RedisConnectionHolder pmHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);
|
||||
//TODO: investigate tx synchronization
|
||||
|
||||
if (pmHolder != null)
|
||||
return pmHolder.getConnection();
|
||||
|
||||
if (log.isDebugEnabled())
|
||||
log.debug("Opening RedisConnection");
|
||||
|
||||
RedisConnection<?> conn = factory.getConnection();
|
||||
|
||||
if (TransactionSynchronizationManager.isSynchronizationActive()) {
|
||||
pmHolder = new RedisConnectionHolder(conn);
|
||||
TransactionSynchronizationManager.registerSynchronization(new RedisConnectionSynchronization(pmHolder,
|
||||
factory, true));
|
||||
TransactionSynchronizationManager.bindResource(factory, pmHolder);
|
||||
|
||||
}
|
||||
return pmHolder.getConnection();
|
||||
|
||||
}
|
||||
|
||||
public static void releaseConnection(RedisConnection<?> conn, RedisConnectionFactory factory) {
|
||||
if (conn == null) {
|
||||
return;
|
||||
}
|
||||
// Only release non-transactional/non-bound connections.
|
||||
if (!isConnectionTransactional(conn, factory)) {
|
||||
log.debug("Closing Redis Connection");
|
||||
conn.close();
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isConnectionTransactional(RedisConnection<?> conn, RedisConnectionFactory connFactory) {
|
||||
if (connFactory == null) {
|
||||
return false;
|
||||
}
|
||||
RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(connFactory);
|
||||
return (connHolder != null && conn == connHolder.getConnection());
|
||||
}
|
||||
|
||||
private static class RedisConnectionSynchronization extends
|
||||
ResourceHolderSynchronization<RedisConnectionHolder, RedisConnectionFactory> {
|
||||
|
||||
private final boolean newRedisConnection;
|
||||
|
||||
public RedisConnectionSynchronization(RedisConnectionHolder connHolder, RedisConnectionFactory connFactory,
|
||||
boolean newRedisConnection) {
|
||||
super(connHolder, connFactory);
|
||||
this.newRedisConnection = newRedisConnection;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean shouldUnbindAtCompletion() {
|
||||
return this.newRedisConnection;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void releaseResource(RedisConnectionHolder resourceHolder, RedisConnectionFactory resourceKey) {
|
||||
releaseConnection(resourceHolder.getConnection(), resourceKey);
|
||||
}
|
||||
}
|
||||
|
||||
private static class RedisConnectionHolder implements ResourceHolder {
|
||||
|
||||
private boolean isVoid = false;
|
||||
private final RedisConnection<?> conn;
|
||||
|
||||
public RedisConnectionHolder(RedisConnection<?> conn) {
|
||||
this.conn = conn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isVoid() {
|
||||
return isVoid;
|
||||
}
|
||||
|
||||
public RedisConnection<?> getConnection() {
|
||||
return conn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
// no-op
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unbound() {
|
||||
this.isVoid = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user