diff --git a/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/listener/RedisListeningContainer.java b/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/listener/RedisListeningContainer.java index 4edf416d5..683d3d914 100644 --- a/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/listener/RedisListeningContainer.java +++ b/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/listener/RedisListeningContainer.java @@ -28,11 +28,16 @@ import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.SmartLifecycle; +import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.core.task.TaskExecutor; import org.springframework.data.keyvalue.redis.connection.Message; import org.springframework.data.keyvalue.redis.connection.MessageListener; +import org.springframework.data.keyvalue.redis.connection.RedisConnection; import org.springframework.data.keyvalue.redis.connection.RedisConnectionFactory; import org.springframework.data.keyvalue.redis.serializer.RedisSerializer; import org.springframework.data.keyvalue.redis.serializer.StringRedisSerializer; +import org.springframework.scheduling.SchedulingAwareRunnable; +import org.springframework.util.ClassUtils; import org.springframework.util.CollectionUtils; /** @@ -44,7 +49,7 @@ import org.springframework.util.CollectionUtils; * the message dispatch being done through the task executor. * *

- * Note the container uses the connection only if at least one listener is configured. + * Note the container uses the connection in a lazy fashion (only if at least one listener is configured). * * @author Costin Leau */ @@ -52,7 +57,14 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean private static final Log log = LogFactory.getLog(RedisListeningContainer.class); - private Executor connectionWorker; + /** + * Default thread name prefix: "RedisListeningContainer-". + */ + public static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(RedisListeningContainer.class) + + "-"; + + + private Executor subscriptionExecutor; private Executor taskExecutor; @@ -60,9 +72,17 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean private String beanName; + private final Object monitor = new Object(); + // whether the container is running (or not) private volatile boolean running = false; + // whether the container has been initialized private volatile boolean initialized = false; + // whether the container uses a connection or not + // (as the container might be running but w/o listeners, it won't use any resources) + private volatile boolean listening = false; + + private volatile boolean manageExecutor = false; // lookup maps @@ -73,22 +93,49 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean // lookup map between channels and listeners private final Map> channelMapping = new ConcurrentHashMap>(); + private final SubscriptionTask subscriptionTask = new SubscriptionTask(); + private final MessageListener multiplexer = new DispatchMessageListener(); private RedisSerializer serializer = new StringRedisSerializer(); @Override - public void afterPropertiesSet() throws Exception { - //startListening(); + public void afterPropertiesSet() { + if (taskExecutor == null) { + manageExecutor = true; + taskExecutor = createDefaultTaskExecutor(); + } + + if (subscriptionExecutor == null) { + subscriptionExecutor = taskExecutor; + } + + start(); initialized = true; } + /** + * Creates a default TaskExecutor. Called if no explicit TaskExecutor has been specified. + *

The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor} + * with the specified bean name (or the class name, if no bean name specified) as thread name prefix. + * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String) + */ + protected TaskExecutor createDefaultTaskExecutor() { + String threadNamePrefix = (beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX); + return new SimpleAsyncTaskExecutor(threadNamePrefix); + } + @Override public void destroy() throws Exception { initialized = false; - // stop listening - //stopListening(); + stop(); + + if (manageExecutor) { + if (taskExecutor instanceof DisposableBean) { + ((DisposableBean) taskExecutor).destroy(); + } + } } @Override @@ -98,7 +145,8 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean @Override public void stop(Runnable callback) { - throw new UnsupportedOperationException(); + stop(); + callback.run(); } @Override @@ -114,11 +162,15 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean @Override public void start() { - throw new UnsupportedOperationException(); + if (!running) { + running = true; + lazyListen(); + } } @Override public void stop() { + running = false; throw new UnsupportedOperationException(); } @@ -145,7 +197,32 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean /** - * Sets the serializer for converting the raw channels and patterns into Strings. + * Sets the task executor used for running the message listeners when messages are received. + * If no task executor is set, an instance of {@link SimpleAsyncTaskExecutor} will be used by default. + * The task executor can be adjusted depending on the work done by the listeners and the number of + * messages coming in. + * + * @param taskExecutor The taskExecutor to set. + */ + public void setTaskExecutor(Executor taskExecutor) { + this.taskExecutor = taskExecutor; + } + + /** + * Sets the task execution used for subscribing to Redis channels. By default, if no executor is set, + * the {@link #setTaskExecutor(Executor)} will be used. In some cases, this might be undersired as + * the listening to the connection is a long running task. + * + *

Note: This implementation uses at most one thread (depending on whether there are any listeners registered or not). + * + * @param subscriptionExecutor The subscriptionExecutor to set. + */ + public void setSubscriptionExecutor(Executor subscriptionExecutor) { + this.subscriptionExecutor = subscriptionExecutor; + } + + /** + * Sets the serializer for converting the {@link Topic}s into low-level channels and patterns. * By default, {@link StringRedisSerializer} is used. * * @param serializer The serializer to set. @@ -176,7 +253,8 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean * @param topics message listener topic */ public void addMessageListener(MessageListener listener, Collection topics) { - + addListener(listener, topics); + lazyListen(); } private void initMapping(Map> listeners) { @@ -200,6 +278,29 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean } } + /** + * Method inspecting whether listening for messages (and thus using a thread) is actually needed. + */ + private void lazyListen() { + boolean debug = log.isDebugEnabled(); + + if (channelMapping.size() > 0 || patternMapping.size() > 0) { + subscriptionExecutor.execute(subscriptionTask); + listening = true; + + if (debug) { + log.debug("Started listening for Redis messages"); + } + } + else { + listening = false; + if (debug) { + log.debug("Postpone listening for Redis messages until actual listeners are added"); + } + } + } + + private void addListener(MessageListener listener, Collection topics) { for (Topic topic : topics) { @@ -229,6 +330,61 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean } } + /** + * Runnable used for Redis subscription. Implemented as a dedicated class to provide as many hints + * as possible to the underlying thread pool. + * + * @author Costin Leau + */ + private class SubscriptionTask implements SchedulingAwareRunnable { + + @Override + public boolean isLongLived() { + return true; + } + + @Override + public void run() { + RedisConnection connection = connectionFactory.getConnection(); + try { + if (connection.isSubscribed()) { + listening = false; + throw new IllegalStateException("Retrieved connection is already subscribed; aborting listening"); + } + + // NB: each Xsubscribe call blocks + + // subscribe one way or the other + // and schedule the rest + if (!channelMapping.isEmpty()) { + connection.subscribe(new DispatchMessageListener(), unwrap(channelMapping.keySet())); + } + else { + connection.pSubscribe(new DispatchMessageListener(), unwrap(patternMapping.keySet())); + } + } finally { + if (connection != null) { + connection.close(); + } + } + } + + private byte[][] unwrap(Collection holders) { + if (CollectionUtils.isEmpty(holders)) { + return new byte[0][]; + } + + byte[][] unwrapped = new byte[holders.size()][]; + + int index = 0; + for (ArrayHolder arrayHolder : holders) { + unwrapped[index++] = arrayHolder.array; + } + + return unwrapped; + } + } + /** * Actual message dispatcher/multiplexer. *