diff --git a/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/connection/jedis/JedisSubscription.java b/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/connection/jedis/JedisSubscription.java index bfdda40dd..4f47c2be9 100644 --- a/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/connection/jedis/JedisSubscription.java +++ b/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/connection/jedis/JedisSubscription.java @@ -119,7 +119,7 @@ class JedisSubscription implements Subscription { @Override public void subscribe(byte[]... channels) { - Assert.notEmpty(patterns, "at least one pattern required"); + Assert.notEmpty(channels, "at least one channel required"); synchronized (this.channels) { for (byte[] bs : channels) { diff --git a/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/listener/RedisMessageListenerContainer.java b/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/listener/RedisMessageListenerContainer.java index aa7bd513f..99e89f3b4 100644 --- a/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/listener/RedisMessageListenerContainer.java +++ b/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/listener/RedisMessageListenerContainer.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -66,6 +67,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab public static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(RedisMessageListenerContainer.class) + "-"; + private long initWait = TimeUnit.SECONDS.toMillis(5); private Executor subscriptionExecutor; @@ -98,7 +100,6 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab private final SubscriptionTask subscriptionTask = new SubscriptionTask(); - private final MessageListener multiplexer = new DispatchMessageListener(); private volatile RedisSerializer serializer = new StringRedisSerializer(); @@ -171,7 +172,18 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab public void start() { if (!running) { running = true; - lazyListen(); + // wait for the subscription to start before returning + // technically speaking we can only be notified right before the subscription starts + synchronized (monitor) { + lazyListen(); + try { + // wait up to 5 seconds + monitor.wait(initWait); + } catch (InterruptedException e) { + // stop waiting + } + } + if (log.isDebugEnabled()) { log.debug("Started RedisMessageListenerContainer"); } @@ -181,7 +193,14 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab @Override public void stop() { running = false; - subscriptionTask.cancel(); + synchronized (monitor) { + subscriptionTask.cancel(); + try { + monitor.wait(initWait); + } catch (InterruptedException ex) { + // stop waiting + } + } if (log.isDebugEnabled()) { log.debug("Stopped RedisMessageListenerContainer"); @@ -393,7 +412,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab */ private class PatternSubscriptionTask implements SchedulingAwareRunnable { - private long WAIT = 1000; + private long WAIT = 500; private long ROUNDS = 3; @Override @@ -444,6 +463,10 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab // NB: each Xsubscribe call blocks + synchronized (monitor) { + monitor.notify(); + } + // subscribe one way or the other // and schedule the rest if (!channelMapping.isEmpty()) { @@ -460,6 +483,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab } finally { // this block is executed once the subscription has ended // meaning cleanup is required + listening = false; if (connection != null) { @@ -470,6 +494,12 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab } } } + + // done with the thread, app can be destroyed + synchronized (monitor) { + monitor.notify(); + } + } } diff --git a/spring-data-redis/src/test/java/org/springframework/data/keyvalue/redis/listener/PubSubTests.java b/spring-data-redis/src/test/java/org/springframework/data/keyvalue/redis/listener/PubSubTests.java index 75f1a744f..c0a2fb04c 100644 --- a/spring-data-redis/src/test/java/org/springframework/data/keyvalue/redis/listener/PubSubTests.java +++ b/spring-data-redis/src/test/java/org/springframework/data/keyvalue/redis/listener/PubSubTests.java @@ -116,11 +116,6 @@ public class PubSubTests { @Test public void testContainerSubscribe() throws Exception { - - - // wait for the container to start the registration - - Thread.sleep(500); String payload1 = "do"; String payload2 = "re mi"; template.convertAndSend(CHANNEL, payload1); @@ -137,13 +132,7 @@ public class PubSubTests { @Test public void testMessageBatch() throws Exception { - - container.addMessageListener(adapter, Arrays.asList(new ChannelTopic(CHANNEL))); - - // wait for the container to start the registration - int COUNT = 10; - Thread.sleep(500); for (int i = 0; i < COUNT; i++) { template.convertAndSend(CHANNEL, "message=" + i); }