+ improve Message container behaviour by waiting (as much as possible) for the initial subscription to complete
This commit is contained in:
@@ -119,7 +119,7 @@ class JedisSubscription implements Subscription {
|
||||
|
||||
@Override
|
||||
public void subscribe(byte[]... channels) {
|
||||
Assert.notEmpty(patterns, "at least one pattern required");
|
||||
Assert.notEmpty(channels, "at least one channel required");
|
||||
|
||||
synchronized (this.channels) {
|
||||
for (byte[] bs : channels) {
|
||||
|
||||
@@ -23,6 +23,7 @@ import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@@ -66,6 +67,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
|
||||
public static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(RedisMessageListenerContainer.class)
|
||||
+ "-";
|
||||
|
||||
private long initWait = TimeUnit.SECONDS.toMillis(5);
|
||||
|
||||
private Executor subscriptionExecutor;
|
||||
|
||||
@@ -98,7 +100,6 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
|
||||
|
||||
private final SubscriptionTask subscriptionTask = new SubscriptionTask();
|
||||
|
||||
private final MessageListener multiplexer = new DispatchMessageListener();
|
||||
private volatile RedisSerializer<String> serializer = new StringRedisSerializer();
|
||||
|
||||
|
||||
@@ -171,7 +172,18 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
|
||||
public void start() {
|
||||
if (!running) {
|
||||
running = true;
|
||||
lazyListen();
|
||||
// wait for the subscription to start before returning
|
||||
// technically speaking we can only be notified right before the subscription starts
|
||||
synchronized (monitor) {
|
||||
lazyListen();
|
||||
try {
|
||||
// wait up to 5 seconds
|
||||
monitor.wait(initWait);
|
||||
} catch (InterruptedException e) {
|
||||
// stop waiting
|
||||
}
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Started RedisMessageListenerContainer");
|
||||
}
|
||||
@@ -181,7 +193,14 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
|
||||
@Override
|
||||
public void stop() {
|
||||
running = false;
|
||||
subscriptionTask.cancel();
|
||||
synchronized (monitor) {
|
||||
subscriptionTask.cancel();
|
||||
try {
|
||||
monitor.wait(initWait);
|
||||
} catch (InterruptedException ex) {
|
||||
// stop waiting
|
||||
}
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Stopped RedisMessageListenerContainer");
|
||||
@@ -393,7 +412,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
|
||||
*/
|
||||
private class PatternSubscriptionTask implements SchedulingAwareRunnable {
|
||||
|
||||
private long WAIT = 1000;
|
||||
private long WAIT = 500;
|
||||
private long ROUNDS = 3;
|
||||
|
||||
@Override
|
||||
@@ -444,6 +463,10 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
|
||||
|
||||
// NB: each Xsubscribe call blocks
|
||||
|
||||
synchronized (monitor) {
|
||||
monitor.notify();
|
||||
}
|
||||
|
||||
// subscribe one way or the other
|
||||
// and schedule the rest
|
||||
if (!channelMapping.isEmpty()) {
|
||||
@@ -460,6 +483,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
|
||||
} finally {
|
||||
// this block is executed once the subscription has ended
|
||||
// meaning cleanup is required
|
||||
|
||||
listening = false;
|
||||
|
||||
if (connection != null) {
|
||||
@@ -470,6 +494,12 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// done with the thread, app can be destroyed
|
||||
synchronized (monitor) {
|
||||
monitor.notify();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -116,11 +116,6 @@ public class PubSubTests<T> {
|
||||
|
||||
@Test
|
||||
public void testContainerSubscribe() throws Exception {
|
||||
|
||||
|
||||
// wait for the container to start the registration
|
||||
|
||||
Thread.sleep(500);
|
||||
String payload1 = "do";
|
||||
String payload2 = "re mi";
|
||||
template.convertAndSend(CHANNEL, payload1);
|
||||
@@ -137,13 +132,7 @@ public class PubSubTests<T> {
|
||||
|
||||
@Test
|
||||
public void testMessageBatch() throws Exception {
|
||||
|
||||
container.addMessageListener(adapter, Arrays.asList(new ChannelTopic(CHANNEL)));
|
||||
|
||||
// wait for the container to start the registration
|
||||
|
||||
int COUNT = 10;
|
||||
Thread.sleep(500);
|
||||
for (int i = 0; i < COUNT; i++) {
|
||||
template.convertAndSend(CHANNEL, "message=" + i);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user