From b92513de613e1f6d7cd2ece9b370c925c583d9f2 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Thu, 4 Nov 2010 18:55:57 +0200 Subject: [PATCH] + beefed up Redis template + added support for thread/transaction bound support --- .../datastore/redis/core/MyRedisAccessor.java | 6 +- .../datastore/redis/core/MyRedisTemplate.java | 98 ++++++++++++- .../redis/core/RedisConnectionUtils.java | 134 ++++++++++++++++++ 3 files changed, 235 insertions(+), 3 deletions(-) create mode 100644 spring-datastore-redis/src/main/java/org/springframework/datastore/redis/core/RedisConnectionUtils.java diff --git a/spring-datastore-redis/src/main/java/org/springframework/datastore/redis/core/MyRedisAccessor.java b/spring-datastore-redis/src/main/java/org/springframework/datastore/redis/core/MyRedisAccessor.java index 09a270d7e..8de894f55 100644 --- a/spring-datastore-redis/src/main/java/org/springframework/datastore/redis/core/MyRedisAccessor.java +++ b/spring-datastore-redis/src/main/java/org/springframework/datastore/redis/core/MyRedisAccessor.java @@ -52,4 +52,8 @@ public class MyRedisAccessor implements InitializingBean { public void setConnectionFactory(RedisConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } -} + + public RuntimeException tryToConvertRedisAccessException(Exception ex) { + throw new UnsupportedOperationException("wire this into dialects/XXXClient utils"); + } +} \ No newline at end of file diff --git a/spring-datastore-redis/src/main/java/org/springframework/datastore/redis/core/MyRedisTemplate.java b/spring-datastore-redis/src/main/java/org/springframework/datastore/redis/core/MyRedisTemplate.java index db94d56fb..434c6333c 100644 --- a/spring-datastore-redis/src/main/java/org/springframework/datastore/redis/core/MyRedisTemplate.java +++ b/spring-datastore-redis/src/main/java/org/springframework/datastore/redis/core/MyRedisTemplate.java @@ -15,8 +15,17 @@ */ package org.springframework.datastore.redis.core; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + import org.springframework.datastore.redis.core.connection.RedisConnection; import org.springframework.datastore.redis.core.connection.RedisConnectionFactory; +import org.springframework.datastore.redis.support.converter.RedisConverter; +import org.springframework.transaction.support.TransactionSynchronizationManager; +import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; /** * @@ -36,6 +45,7 @@ import org.springframework.datastore.redis.core.connection.RedisConnectionFactor public class MyRedisTemplate extends MyRedisAccessor { private boolean exposeConnection = false; + private RedisConverter converter = null; public MyRedisTemplate() { } @@ -46,7 +56,48 @@ public class MyRedisTemplate extends MyRedisAccessor { } public T execute(MyRedisCallback action) { - throw new UnsupportedOperationException(); + return execute(action, isExposeConnection()); + } + + + public T execute(MyRedisCallback action, boolean exposeConnection) { + Assert.notNull(action, "Callback object must not be null"); + + RedisConnectionFactory factory = getConnectionFactory(); + RedisConnection conn = RedisConnectionUtils.getRedisConnection(factory); + + boolean existingConnection = TransactionSynchronizationManager.hasResource(factory); + + try { + RedisConnection connToExpose = (exposeConnection ? conn : createRedisConnectionProxy(conn)); + T result = action.doInRedis(connToExpose); + // TODO: should do flush? + return postProcessResult(result, conn, existingConnection); + } catch (Exception ex) { + // TODO: too generic ? + throw tryToConvertRedisAccessException(ex); + } finally { + RedisConnectionUtils.releaseConnection(conn, factory); + } + } + + protected RedisConnection createRedisConnectionProxy(RedisConnection pm) { + Class[] ifcs = ClassUtils.getAllInterfacesForClass(pm.getClass(), getClass().getClassLoader()); + return (RedisConnection) Proxy.newProxyInstance(pm.getClass().getClassLoader(), ifcs, + new CloseSuppressingInvocationHandler(pm)); + } + + protected T postProcessResult(T result, RedisConnection pm, boolean existingConnection) { + return result; + } + + /** + * Returns the exposeConnection. + * + * @return Returns the exposeConnection + */ + public boolean isExposeConnection() { + return exposeConnection; } /** @@ -59,4 +110,47 @@ public class MyRedisTemplate extends MyRedisAccessor { public void setExposeConnection(boolean exposeConnection) { this.exposeConnection = exposeConnection; } -} + + public void setRedisConverter(RedisConverter converter) { + this.converter = converter; + } + + /** + * Invocation handler that suppresses close calls on JDO PersistenceManagers. + * Also prepares returned Query objects. + * @see RedisConnection#close() + */ + private class CloseSuppressingInvocationHandler implements InvocationHandler { + + private final RedisConnection target; + + public CloseSuppressingInvocationHandler(RedisConnection target) { + this.target = target; + } + + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + // Invocation on PersistenceManager interface (or provider-specific extension) coming in... + + if (method.getName().equals("equals")) { + // Only consider equal when proxies are identical. + return (proxy == args[0]); + } + else if (method.getName().equals("hashCode")) { + // Use hashCode of PersistenceManager proxy. + return System.identityHashCode(proxy); + } + else if (method.getName().equals("close")) { + // Handle close method: suppress, not valid. + return null; + } + + // Invoke method on target RedisConnection. + try { + Object retVal = method.invoke(this.target, args); + return retVal; + } catch (InvocationTargetException ex) { + throw ex.getTargetException(); + } + } + } +} \ No newline at end of file diff --git a/spring-datastore-redis/src/main/java/org/springframework/datastore/redis/core/RedisConnectionUtils.java b/spring-datastore-redis/src/main/java/org/springframework/datastore/redis/core/RedisConnectionUtils.java new file mode 100644 index 000000000..526371d85 --- /dev/null +++ b/spring-datastore-redis/src/main/java/org/springframework/datastore/redis/core/RedisConnectionUtils.java @@ -0,0 +1,134 @@ +/* + * Copyright 2006-2009 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.datastore.redis.core; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.datastore.redis.core.connection.RedisConnection; +import org.springframework.datastore.redis.core.connection.RedisConnectionFactory; +import org.springframework.transaction.support.ResourceHolder; +import org.springframework.transaction.support.ResourceHolderSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationManager; +import org.springframework.util.Assert; + +/** + * Helper class featuring {@link RedisConnection} handling, allowing for reuse of instances within transactions. + * + * @author Costin Leau + */ +public abstract class RedisConnectionUtils { + + private static final Log log = LogFactory.getLog(RedisConnectionUtils.class); + + public static RedisConnection getRedisConnection(RedisConnectionFactory factory) { + return doGetRedisConnection(factory, true); + } + + public static RedisConnection doGetRedisConnection(RedisConnectionFactory factory, boolean allowCreate) { + Assert.notNull(factory, "No RedisConnectionFactory specified"); + + RedisConnectionHolder pmHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory); + //TODO: investigate tx synchronization + + if (pmHolder != null) + return pmHolder.getConnection(); + + if (log.isDebugEnabled()) + log.debug("Opening RedisConnection"); + + RedisConnection conn = factory.getConnection(); + + if (TransactionSynchronizationManager.isSynchronizationActive()) { + pmHolder = new RedisConnectionHolder(conn); + TransactionSynchronizationManager.registerSynchronization(new RedisConnectionSynchronization(pmHolder, + factory, true)); + TransactionSynchronizationManager.bindResource(factory, pmHolder); + + } + return pmHolder.getConnection(); + + } + + public static void releaseConnection(RedisConnection conn, RedisConnectionFactory factory) { + if (conn == null) { + return; + } + // Only release non-transactional/non-bound connections. + if (!isConnectionTransactional(conn, factory)) { + log.debug("Closing Redis Connection"); + conn.close(); + } + } + + public static boolean isConnectionTransactional(RedisConnection conn, RedisConnectionFactory connFactory) { + if (connFactory == null) { + return false; + } + RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(connFactory); + return (connHolder != null && conn == connHolder.getConnection()); + } + + private static class RedisConnectionSynchronization extends + ResourceHolderSynchronization { + + private final boolean newRedisConnection; + + public RedisConnectionSynchronization(RedisConnectionHolder connHolder, RedisConnectionFactory connFactory, + boolean newRedisConnection) { + super(connHolder, connFactory); + this.newRedisConnection = newRedisConnection; + } + + @Override + protected boolean shouldUnbindAtCompletion() { + return this.newRedisConnection; + } + + @Override + protected void releaseResource(RedisConnectionHolder resourceHolder, RedisConnectionFactory resourceKey) { + releaseConnection(resourceHolder.getConnection(), resourceKey); + } + } + + private static class RedisConnectionHolder implements ResourceHolder { + + private boolean isVoid = false; + private final RedisConnection conn; + + public RedisConnectionHolder(RedisConnection conn) { + this.conn = conn; + } + + @Override + public boolean isVoid() { + return isVoid; + } + + public RedisConnection getConnection() { + return conn; + } + + @Override + public void reset() { + // no-op + } + + @Override + public void unbound() { + this.isVoid = true; + } + } +} \ No newline at end of file