DATAKV-22

+ RedisListeningContainer updates - registration is done, still need to handle unregistration and initial batching of subscribe and psubscribe
This commit is contained in:
Costin Leau
2011-01-13 20:34:32 +02:00
parent 03c92e376e
commit c16901d9df

View File

@@ -28,11 +28,16 @@ import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.keyvalue.redis.connection.Message;
import org.springframework.data.keyvalue.redis.connection.MessageListener;
import org.springframework.data.keyvalue.redis.connection.RedisConnection;
import org.springframework.data.keyvalue.redis.connection.RedisConnectionFactory;
import org.springframework.data.keyvalue.redis.serializer.RedisSerializer;
import org.springframework.data.keyvalue.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
/**
@@ -44,7 +49,7 @@ import org.springframework.util.CollectionUtils;
* the message dispatch being done through the task executor.
*
* <p/>
* Note the container uses the connection only if at least one listener is configured.
* Note the container uses the connection in a lazy fashion (only if at least one listener is configured).
*
* @author Costin Leau
*/
@@ -52,7 +57,14 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean
private static final Log log = LogFactory.getLog(RedisListeningContainer.class);
private Executor connectionWorker;
/**
* Default thread name prefix: "RedisListeningContainer-".
*/
public static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(RedisListeningContainer.class)
+ "-";
private Executor subscriptionExecutor;
private Executor taskExecutor;
@@ -60,9 +72,17 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean
private String beanName;
private final Object monitor = new Object();
// whether the container is running (or not)
private volatile boolean running = false;
// whether the container has been initialized
private volatile boolean initialized = false;
// whether the container uses a connection or not
// (as the container might be running but w/o listeners, it won't use any resources)
private volatile boolean listening = false;
private volatile boolean manageExecutor = false;
// lookup maps
@@ -73,22 +93,49 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean
// lookup map between channels and listeners
private final Map<ArrayHolder, Collection<MessageListener>> channelMapping = new ConcurrentHashMap<ArrayHolder, Collection<MessageListener>>();
private final SubscriptionTask subscriptionTask = new SubscriptionTask();
private final MessageListener multiplexer = new DispatchMessageListener();
private RedisSerializer<String> serializer = new StringRedisSerializer();
@Override
public void afterPropertiesSet() throws Exception {
//startListening();
public void afterPropertiesSet() {
if (taskExecutor == null) {
manageExecutor = true;
taskExecutor = createDefaultTaskExecutor();
}
if (subscriptionExecutor == null) {
subscriptionExecutor = taskExecutor;
}
start();
initialized = true;
}
/**
* Creates a default TaskExecutor. Called if no explicit TaskExecutor has been specified.
* <p>The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
* with the specified bean name (or the class name, if no bean name specified) as thread name prefix.
* @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
*/
protected TaskExecutor createDefaultTaskExecutor() {
String threadNamePrefix = (beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
return new SimpleAsyncTaskExecutor(threadNamePrefix);
}
@Override
public void destroy() throws Exception {
initialized = false;
// stop listening
//stopListening();
stop();
if (manageExecutor) {
if (taskExecutor instanceof DisposableBean) {
((DisposableBean) taskExecutor).destroy();
}
}
}
@Override
@@ -98,7 +145,8 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean
@Override
public void stop(Runnable callback) {
throw new UnsupportedOperationException();
stop();
callback.run();
}
@Override
@@ -114,11 +162,15 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean
@Override
public void start() {
throw new UnsupportedOperationException();
if (!running) {
running = true;
lazyListen();
}
}
@Override
public void stop() {
running = false;
throw new UnsupportedOperationException();
}
@@ -145,7 +197,32 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean
/**
* Sets the serializer for converting the raw channels and patterns into Strings.
* Sets the task executor used for running the message listeners when messages are received.
* If no task executor is set, an instance of {@link SimpleAsyncTaskExecutor} will be used by default.
* The task executor can be adjusted depending on the work done by the listeners and the number of
* messages coming in.
*
* @param taskExecutor The taskExecutor to set.
*/
public void setTaskExecutor(Executor taskExecutor) {
this.taskExecutor = taskExecutor;
}
/**
* Sets the task execution used for subscribing to Redis channels. By default, if no executor is set,
* the {@link #setTaskExecutor(Executor)} will be used. In some cases, this might be undersired as
* the listening to the connection is a long running task.
*
* <p/>Note: This implementation uses at most one thread (depending on whether there are any listeners registered or not).
*
* @param subscriptionExecutor The subscriptionExecutor to set.
*/
public void setSubscriptionExecutor(Executor subscriptionExecutor) {
this.subscriptionExecutor = subscriptionExecutor;
}
/**
* Sets the serializer for converting the {@link Topic}s into low-level channels and patterns.
* By default, {@link StringRedisSerializer} is used.
*
* @param serializer The serializer to set.
@@ -176,7 +253,8 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean
* @param topics message listener topic
*/
public void addMessageListener(MessageListener listener, Collection<Topic> topics) {
addListener(listener, topics);
lazyListen();
}
private void initMapping(Map<? extends MessageListener, Collection<Topic>> listeners) {
@@ -200,6 +278,29 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean
}
}
/**
* Method inspecting whether listening for messages (and thus using a thread) is actually needed.
*/
private void lazyListen() {
boolean debug = log.isDebugEnabled();
if (channelMapping.size() > 0 || patternMapping.size() > 0) {
subscriptionExecutor.execute(subscriptionTask);
listening = true;
if (debug) {
log.debug("Started listening for Redis messages");
}
}
else {
listening = false;
if (debug) {
log.debug("Postpone listening for Redis messages until actual listeners are added");
}
}
}
private void addListener(MessageListener listener, Collection<Topic> topics) {
for (Topic topic : topics) {
@@ -229,6 +330,61 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean
}
}
/**
* Runnable used for Redis subscription. Implemented as a dedicated class to provide as many hints
* as possible to the underlying thread pool.
*
* @author Costin Leau
*/
private class SubscriptionTask implements SchedulingAwareRunnable {
@Override
public boolean isLongLived() {
return true;
}
@Override
public void run() {
RedisConnection connection = connectionFactory.getConnection();
try {
if (connection.isSubscribed()) {
listening = false;
throw new IllegalStateException("Retrieved connection is already subscribed; aborting listening");
}
// NB: each Xsubscribe call blocks
// subscribe one way or the other
// and schedule the rest
if (!channelMapping.isEmpty()) {
connection.subscribe(new DispatchMessageListener(), unwrap(channelMapping.keySet()));
}
else {
connection.pSubscribe(new DispatchMessageListener(), unwrap(patternMapping.keySet()));
}
} finally {
if (connection != null) {
connection.close();
}
}
}
private byte[][] unwrap(Collection<ArrayHolder> holders) {
if (CollectionUtils.isEmpty(holders)) {
return new byte[0][];
}
byte[][] unwrapped = new byte[holders.size()][];
int index = 0;
for (ArrayHolder arrayHolder : holders) {
unwrapped[index++] = arrayHolder.array;
}
return unwrapped;
}
}
/**
* Actual message dispatcher/multiplexer.
*