DATAKV-22
+ RedisListeningContainer updates - registration is done, still need to handle unregistration and initial batching of subscribe and psubscribe
This commit is contained in:
@@ -28,11 +28,16 @@ import org.springframework.beans.factory.BeanNameAware;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.core.task.SimpleAsyncTaskExecutor;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.data.keyvalue.redis.connection.Message;
|
||||
import org.springframework.data.keyvalue.redis.connection.MessageListener;
|
||||
import org.springframework.data.keyvalue.redis.connection.RedisConnection;
|
||||
import org.springframework.data.keyvalue.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.keyvalue.redis.serializer.RedisSerializer;
|
||||
import org.springframework.data.keyvalue.redis.serializer.StringRedisSerializer;
|
||||
import org.springframework.scheduling.SchedulingAwareRunnable;
|
||||
import org.springframework.util.ClassUtils;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
/**
|
||||
@@ -44,7 +49,7 @@ import org.springframework.util.CollectionUtils;
|
||||
* the message dispatch being done through the task executor.
|
||||
*
|
||||
* <p/>
|
||||
* Note the container uses the connection only if at least one listener is configured.
|
||||
* Note the container uses the connection in a lazy fashion (only if at least one listener is configured).
|
||||
*
|
||||
* @author Costin Leau
|
||||
*/
|
||||
@@ -52,7 +57,14 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean
|
||||
|
||||
private static final Log log = LogFactory.getLog(RedisListeningContainer.class);
|
||||
|
||||
private Executor connectionWorker;
|
||||
/**
|
||||
* Default thread name prefix: "RedisListeningContainer-".
|
||||
*/
|
||||
public static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(RedisListeningContainer.class)
|
||||
+ "-";
|
||||
|
||||
|
||||
private Executor subscriptionExecutor;
|
||||
|
||||
private Executor taskExecutor;
|
||||
|
||||
@@ -60,9 +72,17 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean
|
||||
|
||||
private String beanName;
|
||||
|
||||
|
||||
private final Object monitor = new Object();
|
||||
// whether the container is running (or not)
|
||||
private volatile boolean running = false;
|
||||
// whether the container has been initialized
|
||||
private volatile boolean initialized = false;
|
||||
// whether the container uses a connection or not
|
||||
// (as the container might be running but w/o listeners, it won't use any resources)
|
||||
private volatile boolean listening = false;
|
||||
|
||||
private volatile boolean manageExecutor = false;
|
||||
|
||||
|
||||
// lookup maps
|
||||
@@ -73,22 +93,49 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean
|
||||
// lookup map between channels and listeners
|
||||
private final Map<ArrayHolder, Collection<MessageListener>> channelMapping = new ConcurrentHashMap<ArrayHolder, Collection<MessageListener>>();
|
||||
|
||||
private final SubscriptionTask subscriptionTask = new SubscriptionTask();
|
||||
|
||||
private final MessageListener multiplexer = new DispatchMessageListener();
|
||||
private RedisSerializer<String> serializer = new StringRedisSerializer();
|
||||
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
//startListening();
|
||||
public void afterPropertiesSet() {
|
||||
if (taskExecutor == null) {
|
||||
manageExecutor = true;
|
||||
taskExecutor = createDefaultTaskExecutor();
|
||||
}
|
||||
|
||||
if (subscriptionExecutor == null) {
|
||||
subscriptionExecutor = taskExecutor;
|
||||
}
|
||||
|
||||
start();
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a default TaskExecutor. Called if no explicit TaskExecutor has been specified.
|
||||
* <p>The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
|
||||
* with the specified bean name (or the class name, if no bean name specified) as thread name prefix.
|
||||
* @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
|
||||
*/
|
||||
protected TaskExecutor createDefaultTaskExecutor() {
|
||||
String threadNamePrefix = (beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
|
||||
return new SimpleAsyncTaskExecutor(threadNamePrefix);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
initialized = false;
|
||||
|
||||
// stop listening
|
||||
//stopListening();
|
||||
stop();
|
||||
|
||||
if (manageExecutor) {
|
||||
if (taskExecutor instanceof DisposableBean) {
|
||||
((DisposableBean) taskExecutor).destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -98,7 +145,8 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean
|
||||
|
||||
@Override
|
||||
public void stop(Runnable callback) {
|
||||
throw new UnsupportedOperationException();
|
||||
stop();
|
||||
callback.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -114,11 +162,15 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
throw new UnsupportedOperationException();
|
||||
if (!running) {
|
||||
running = true;
|
||||
lazyListen();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
running = false;
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@@ -145,7 +197,32 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean
|
||||
|
||||
|
||||
/**
|
||||
* Sets the serializer for converting the raw channels and patterns into Strings.
|
||||
* Sets the task executor used for running the message listeners when messages are received.
|
||||
* If no task executor is set, an instance of {@link SimpleAsyncTaskExecutor} will be used by default.
|
||||
* The task executor can be adjusted depending on the work done by the listeners and the number of
|
||||
* messages coming in.
|
||||
*
|
||||
* @param taskExecutor The taskExecutor to set.
|
||||
*/
|
||||
public void setTaskExecutor(Executor taskExecutor) {
|
||||
this.taskExecutor = taskExecutor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the task execution used for subscribing to Redis channels. By default, if no executor is set,
|
||||
* the {@link #setTaskExecutor(Executor)} will be used. In some cases, this might be undersired as
|
||||
* the listening to the connection is a long running task.
|
||||
*
|
||||
* <p/>Note: This implementation uses at most one thread (depending on whether there are any listeners registered or not).
|
||||
*
|
||||
* @param subscriptionExecutor The subscriptionExecutor to set.
|
||||
*/
|
||||
public void setSubscriptionExecutor(Executor subscriptionExecutor) {
|
||||
this.subscriptionExecutor = subscriptionExecutor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the serializer for converting the {@link Topic}s into low-level channels and patterns.
|
||||
* By default, {@link StringRedisSerializer} is used.
|
||||
*
|
||||
* @param serializer The serializer to set.
|
||||
@@ -176,7 +253,8 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean
|
||||
* @param topics message listener topic
|
||||
*/
|
||||
public void addMessageListener(MessageListener listener, Collection<Topic> topics) {
|
||||
|
||||
addListener(listener, topics);
|
||||
lazyListen();
|
||||
}
|
||||
|
||||
private void initMapping(Map<? extends MessageListener, Collection<Topic>> listeners) {
|
||||
@@ -200,6 +278,29 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Method inspecting whether listening for messages (and thus using a thread) is actually needed.
|
||||
*/
|
||||
private void lazyListen() {
|
||||
boolean debug = log.isDebugEnabled();
|
||||
|
||||
if (channelMapping.size() > 0 || patternMapping.size() > 0) {
|
||||
subscriptionExecutor.execute(subscriptionTask);
|
||||
listening = true;
|
||||
|
||||
if (debug) {
|
||||
log.debug("Started listening for Redis messages");
|
||||
}
|
||||
}
|
||||
else {
|
||||
listening = false;
|
||||
if (debug) {
|
||||
log.debug("Postpone listening for Redis messages until actual listeners are added");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void addListener(MessageListener listener, Collection<Topic> topics) {
|
||||
for (Topic topic : topics) {
|
||||
|
||||
@@ -229,6 +330,61 @@ public class RedisListeningContainer implements InitializingBean, DisposableBean
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Runnable used for Redis subscription. Implemented as a dedicated class to provide as many hints
|
||||
* as possible to the underlying thread pool.
|
||||
*
|
||||
* @author Costin Leau
|
||||
*/
|
||||
private class SubscriptionTask implements SchedulingAwareRunnable {
|
||||
|
||||
@Override
|
||||
public boolean isLongLived() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
RedisConnection connection = connectionFactory.getConnection();
|
||||
try {
|
||||
if (connection.isSubscribed()) {
|
||||
listening = false;
|
||||
throw new IllegalStateException("Retrieved connection is already subscribed; aborting listening");
|
||||
}
|
||||
|
||||
// NB: each Xsubscribe call blocks
|
||||
|
||||
// subscribe one way or the other
|
||||
// and schedule the rest
|
||||
if (!channelMapping.isEmpty()) {
|
||||
connection.subscribe(new DispatchMessageListener(), unwrap(channelMapping.keySet()));
|
||||
}
|
||||
else {
|
||||
connection.pSubscribe(new DispatchMessageListener(), unwrap(patternMapping.keySet()));
|
||||
}
|
||||
} finally {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private byte[][] unwrap(Collection<ArrayHolder> holders) {
|
||||
if (CollectionUtils.isEmpty(holders)) {
|
||||
return new byte[0][];
|
||||
}
|
||||
|
||||
byte[][] unwrapped = new byte[holders.size()][];
|
||||
|
||||
int index = 0;
|
||||
for (ArrayHolder arrayHolder : holders) {
|
||||
unwrapped[index++] = arrayHolder.array;
|
||||
}
|
||||
|
||||
return unwrapped;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Actual message dispatcher/multiplexer.
|
||||
*
|
||||
|
||||
Reference in New Issue
Block a user