Simplify logic in RedisMessageListenerContainer and supporting classes.

Closes #2662
Original pull request: #2663
This commit is contained in:
John Blum
2023-08-04 18:07:27 -07:00
committed by Mark Paluch
parent 5d297470d1
commit 979fb6ac6d
4 changed files with 274 additions and 265 deletions

View File

@@ -0,0 +1,71 @@
/*
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.listener;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
/**
* Abstract base class for defining {@link Topic Topics}.
*
* @author John Blum
* @see org.springframework.data.redis.listener.Topic
* @since 3.2.0
*/
abstract class AbstractTopic implements Topic {
private final String name;
AbstractTopic(String label, String name) {
Assert.notNull(name,() -> label + " must not be null");
this.name = name;
}
@Override
public String getTopic() {
return this.name;
}
@Override
public boolean equals(@Nullable Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof AbstractTopic that)) {
return false;
}
// Must be exact Topic type
if (this.getClass() != that.getClass()) {
return false;
}
return ObjectUtils.nullSafeEquals(this.getTopic(), that.getTopic());
}
@Override
public int hashCode() {
return ObjectUtils.nullSafeHashCode(getTopic());
}
@Override
public String toString() {
return getTopic();
}
}

View File

@@ -15,71 +15,32 @@
*/
package org.springframework.data.redis.listener;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
/**
* Channel topic implementation (maps to a Redis channel).
* {@link Topic Channel Topic} implementation mapping to a Redis channel.
*
* @author Costin Leau
* @author Mark Paluch
* @author John Blum
*/
public class ChannelTopic implements Topic {
private final String channelName;
/**
* Constructs a new {@link ChannelTopic} instance.
*
* @param name must not be {@literal null}.
*/
public ChannelTopic(String name) {
Assert.notNull(name, "Topic name must not be null");
this.channelName = name;
}
public class ChannelTopic extends AbstractTopic {
/**
* Create a new {@link ChannelTopic} for channel subscriptions.
*
* @param name the channel name, must not be {@literal null} or empty.
* @return the {@link ChannelTopic} for {@code channelName}.
* @param channelName {@link String name} of the Redis channel; must not be {@literal null} or {@literal empty}.
* @return the {@link ChannelTopic} for the given {@code channelName}.
* @since 2.1
*/
public static ChannelTopic of(String name) {
return new ChannelTopic(name);
public static ChannelTopic of(String channelName) {
return new ChannelTopic(channelName);
}
/**
* @return topic name.
* Constructs a new {@link ChannelTopic} instance.
*
* @param channelName must not be {@literal null}.
*/
@Override
public String getTopic() {
return channelName;
}
@Override
public String toString() {
return channelName;
}
@Override
public boolean equals(@Nullable Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
ChannelTopic that = (ChannelTopic) o;
return ObjectUtils.nullSafeEquals(channelName, that.channelName);
}
@Override
public int hashCode() {
return ObjectUtils.nullSafeHashCode(channelName);
public ChannelTopic(String channelName) {
super("Topic name", channelName);
}
}

View File

@@ -15,38 +15,20 @@
*/
package org.springframework.data.redis.listener;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
/**
* Pattern topic (matching multiple channels).
* {@link Topic} {@link String pattern} matching multiple Redis channels.
*
* @author Costin Leau
* @author Mark Paluch
* @author Christoph Strobl
*/
public class PatternTopic implements Topic {
private final String channelPattern;
/**
* Constructs a new {@link PatternTopic} instance.
*
* @param pattern must not be {@literal null}.
*/
public PatternTopic(String pattern) {
Assert.notNull(pattern, "Pattern must not be null");
this.channelPattern = pattern;
}
public class PatternTopic extends AbstractTopic {
/**
* Create a new {@link PatternTopic} for channel subscriptions based on a {@code pattern}.
*
* @param pattern the channel pattern, must not be {@literal null} or empty.
* @return the {@link PatternTopic} for {@code pattern}.
* @param pattern {@link String pattern} used to match channels; must not be {@literal null} or {@literal empty}.
* @return the {@link PatternTopic} for the given {@code pattern}.
* @since 2.1
*/
public static PatternTopic of(String pattern) {
@@ -54,33 +36,11 @@ public class PatternTopic implements Topic {
}
/**
* @return channel pattern.
* Constructs a new {@link PatternTopic} instance.
*
* @param channelPattern must not be {@literal null}.
*/
@Override
public String getTopic() {
return channelPattern;
}
@Override
public String toString() {
return channelPattern;
}
@Override
public boolean equals(@Nullable Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
PatternTopic that = (PatternTopic) o;
return ObjectUtils.nullSafeEquals(channelPattern, that.channelPattern);
}
@Override
public int hashCode() {
return ObjectUtils.nullSafeHashCode(channelPattern);
public PatternTopic(String channelPattern) {
super("Pattern", channelPattern);
}
}

View File

@@ -33,6 +33,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
@@ -98,20 +99,12 @@ import org.springframework.util.backoff.FixedBackOff;
* @author Way Joke
* @author Thomas Darimont
* @author Mark Paluch
* @author John Blum
* @see MessageListener
* @see SubscriptionListener
*/
public class RedisMessageListenerContainer implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle {
/** Logger available to subclasses */
protected final Log logger = LogFactory.getLog(getClass());
/**
* Default thread name prefix: "RedisListeningContainer-".
*/
public static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(RedisMessageListenerContainer.class)
+ "-";
/**
* The default recovery interval: 5000 ms = 5 seconds.
*/
@@ -122,6 +115,17 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
*/
public static final long DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME = 2000L;
/**
* Default thread name prefix: "RedisListeningContainer-".
*/
public static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(RedisMessageListenerContainer.class)
+ "-";
/** Logger available to subclasses */
protected final Log logger = LogFactory.getLog(getClass());
private @Nullable ErrorHandler errorHandler;
private @Nullable Executor subscriptionExecutor;
private @Nullable Executor taskExecutor;
@@ -130,8 +134,6 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
private @Nullable String beanName;
private @Nullable ErrorHandler errorHandler;
private @Nullable Subscriber subscriber;
private final AtomicBoolean started = new AtomicBoolean();
@@ -142,50 +144,46 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
// whether the container has been initialized via afterPropertiesSet
private boolean afterPropertiesSet = false;
// whether the TaskExecutor was created by the container
private boolean manageExecutor = false;
// lookup maps
// to avoid creation of hashes for each message, the maps use raw byte arrays (wrapped to respect the equals/hashcode
// contract)
// lookup map between patterns and listeners
private final Map<ByteArrayWrapper, Collection<MessageListener>> patternMapping = new ConcurrentHashMap<>();
// lookup map between channels and listeners
private final Map<ByteArrayWrapper, Collection<MessageListener>> channelMapping = new ConcurrentHashMap<>();
// lookup map between listeners and channels
private final Map<MessageListener, Set<Topic>> listenerTopics = new ConcurrentHashMap<>();
private volatile RedisSerializer<String> serializer = RedisSerializer.string();
private long maxSubscriptionRegistrationWaitingTime = DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME;
private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS);
private long maxSubscriptionRegistrationWaitingTime = DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME;
private volatile CompletableFuture<Void> listenFuture = new CompletableFuture<>();
private volatile CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
// Lookup maps; to avoid creation of hashes for each message, the maps use raw byte arrays (wrapped to respect
// the equals/hashcode contract)
// lookup map between channels and listeners
private final Map<ByteArrayWrapper, Collection<MessageListener>> channelMapping = new ConcurrentHashMap<>();
// lookup map between patterns and listeners
private final Map<ByteArrayWrapper, Collection<MessageListener>> patternMapping = new ConcurrentHashMap<>();
// lookup map between listeners and channels
private final Map<MessageListener, Set<Topic>> listenerTopics = new ConcurrentHashMap<>();
private volatile RedisSerializer<String> serializer = RedisSerializer.string();
@Override
public void afterPropertiesSet() {
Assert.state(!afterPropertiesSet, "Container already initialized");
Assert.state(!this.afterPropertiesSet, "Container already initialized");
Assert.notNull(this.connectionFactory, "RedisConnectionFactory is not set");
if (this.connectionFactory == null) {
throw new IllegalArgumentException("RedisConnectionFactory is not set");
if (this.taskExecutor == null) {
this.manageExecutor = true;
this.taskExecutor = createDefaultTaskExecutor();
}
if (taskExecutor == null) {
manageExecutor = true;
taskExecutor = createDefaultTaskExecutor();
}
if (subscriptionExecutor == null) {
subscriptionExecutor = taskExecutor;
if (this.subscriptionExecutor == null) {
this.subscriptionExecutor = this.taskExecutor;
}
this.subscriber = createSubscriber(connectionFactory, this.subscriptionExecutor);
afterPropertiesSet = true;
this.afterPropertiesSet = true;
}
/**
@@ -197,7 +195,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
* @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
*/
protected TaskExecutor createDefaultTaskExecutor() {
String threadNamePrefix = (beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
String threadNamePrefix = this.beanName != null ? this.beanName + "-" : DEFAULT_THREAD_NAME_PREFIX;
return new SimpleAsyncTaskExecutor(threadNamePrefix);
}
@@ -208,17 +206,15 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
*/
@Override
public void destroy() throws Exception {
afterPropertiesSet = false;
this.afterPropertiesSet = false;
stop();
if (manageExecutor) {
if (taskExecutor instanceof DisposableBean) {
((DisposableBean) taskExecutor).destroy();
if (logger.isDebugEnabled()) {
logger.debug("Stopped internally-managed task executor");
}
if (this.manageExecutor) {
if (this.taskExecutor instanceof DisposableBean bean) {
bean.destroy();
logDebug(() -> "Stopped internally-managed task executor");
}
}
}
@@ -241,11 +237,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
public void start() {
if (started.compareAndSet(false, true)) {
if (logger.isDebugEnabled()) {
logger.debug("Starting RedisMessageListenerContainer...");
}
logDebug(() -> "Starting RedisMessageListenerContainer...");
lazyListen();
}
}
@@ -258,26 +250,22 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
CompletableFuture<Void> containerListenFuture = this.listenFuture;
State state = this.state.get();
CompletableFuture<Void> futureToAwait;
if (state.isPrepareListening()) {
futureToAwait = containerListenFuture;
} else {
futureToAwait = lazyListen(backOff.start());
}
CompletableFuture<Void> futureToAwait = state.isPrepareListening() ? containerListenFuture
: lazyListen(this.backOff.start());
try {
futureToAwait.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
} catch (InterruptedException cause) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
} catch (ExecutionException cause) {
if (e.getCause() instanceof DataAccessException) {
throw new RedisListenerExecutionFailedException(e.getMessage(), e.getCause());
if (cause.getCause() instanceof DataAccessException) {
throw new RedisListenerExecutionFailedException(cause.getMessage(), cause.getCause());
}
throw new CompletionException(e.getCause());
} catch (TimeoutException e) {
throw new IllegalStateException("Subscription registration timeout exceeded", e);
throw new CompletionException(cause.getCause());
} catch (TimeoutException cause) {
throw new IllegalStateException("Subscription registration timeout exceeded", cause);
}
}
@@ -292,6 +280,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
}
CompletableFuture<Void> containerListenFuture = this.listenFuture;
while (!doSubscribe(backOffExecution)) {
// busy-loop, allow for synchronization against doUnsubscribe therefore we want to retry.
containerListenFuture = this.listenFuture;
@@ -304,6 +293,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
CompletableFuture<Void> containerListenFuture = this.listenFuture;
CompletableFuture<Void> containerUnsubscribeFuture = this.unsubscribeFuture;
State state = this.state.get();
// someone has called stop while we were in here.
@@ -393,6 +383,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
CompletableFuture<Void> listenFuture = this.listenFuture;
State state = this.state.get();
if (!state.isListenerActivated()) {
return true;
}
@@ -421,12 +412,12 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
}
private void awaitRegistrationTime(CompletableFuture<Void> future) {
try {
future.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
} catch (InterruptedException cause) {
Thread.currentThread().interrupt();
} catch (ExecutionException | TimeoutException e) {
// ignore
} catch (ExecutionException | TimeoutException ignore) {
}
}
@@ -443,7 +434,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
* Return whether this container is currently active, that is, whether it has been set up but not shut down yet.
*/
public final boolean isActive() {
return afterPropertiesSet;
return this.afterPropertiesSet;
}
/**
@@ -453,7 +444,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
*/
@Nullable
public RedisConnectionFactory getConnectionFactory() {
return connectionFactory;
return this.connectionFactory;
}
/**
@@ -462,6 +453,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
public void setConnectionFactory(RedisConnectionFactory connectionFactory) {
Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
this.connectionFactory = connectionFactory;
}
@@ -470,41 +462,6 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
this.beanName = name;
}
/**
* Sets the task executor used for running the message listeners when messages are received. If no task executor is
* set, an instance of {@link SimpleAsyncTaskExecutor} will be used by default. The task executor can be adjusted
* depending on the work done by the listeners and the number of messages coming in.
*
* @param taskExecutor The taskExecutor to set.
*/
public void setTaskExecutor(Executor taskExecutor) {
this.taskExecutor = taskExecutor;
}
/**
* Sets the task execution used for subscribing to Redis channels. By default, if no executor is set, the
* {@link #setTaskExecutor(Executor)} will be used. In some cases, this might be undersired as the listening to the
* connection is a long running task.
* <p>
* Note: This implementation uses at most one long running thread (depending on whether there are any listeners
* registered or not) and up to two threads during the initial registration.
*
* @param subscriptionExecutor The subscriptionExecutor to set.
*/
public void setSubscriptionExecutor(Executor subscriptionExecutor) {
this.subscriptionExecutor = subscriptionExecutor;
}
/**
* Sets the serializer for converting the {@link Topic}s into low-level channels and patterns. By default,
* {@link StringRedisSerializer} is used.
*
* @param serializer The serializer to set.
*/
public void setTopicSerializer(RedisSerializer<String> serializer) {
this.serializer = serializer;
}
/**
* Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default
* there will be <b>no</b> ErrorHandler so that error-level logging is the only result.
@@ -526,6 +483,41 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
initMapping(listeners);
}
/**
* Sets the task execution used for subscribing to Redis channels. By default, if no executor is set, the
* {@link #setTaskExecutor(Executor)} will be used. In some cases, this might be undersired as the listening to the
* connection is a long running task.
* <p>
* Note: This implementation uses at most one long running thread (depending on whether there are any listeners
* registered or not) and up to two threads during the initial registration.
*
* @param subscriptionExecutor The subscriptionExecutor to set.
*/
public void setSubscriptionExecutor(Executor subscriptionExecutor) {
this.subscriptionExecutor = subscriptionExecutor;
}
/**
* Sets the task executor used for running the message listeners when messages are received. If no task executor is
* set, an instance of {@link SimpleAsyncTaskExecutor} will be used by default. The task executor can be adjusted
* depending on the work done by the listeners and the number of messages coming in.
*
* @param taskExecutor The taskExecutor to set.
*/
public void setTaskExecutor(Executor taskExecutor) {
this.taskExecutor = taskExecutor;
}
/**
* Sets the serializer for converting the {@link Topic}s into low-level channels and patterns. By default,
* {@link StringRedisSerializer} is used.
*
* @param serializer The serializer to set.
*/
public void setTopicSerializer(RedisSerializer<String> serializer) {
this.serializer = serializer;
}
/**
* Adds a message listener to the (potentially running) container. If the container is running, the listener starts
* receiving (matching) messages as soon as possible.
@@ -585,10 +577,12 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
public void removeMessageListener(MessageListener listener) {
Assert.notNull(listener, "MessageListener must not be null");
removeMessageListener(listener, Collections.emptySet());
}
private void initMapping(Map<? extends MessageListener, Collection<? extends Topic>> listeners) {
// stop the listener if currently running
if (isRunning()) {
stop();
@@ -605,7 +599,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
}
// resume activity
if (afterPropertiesSet) {
if (this.afterPropertiesSet) {
start();
}
}
@@ -618,46 +612,32 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
List<byte[]> channels = new ArrayList<>(topics.size());
List<byte[]> patterns = new ArrayList<>(topics.size());
boolean trace = logger.isTraceEnabled();
// add listener mapping
Set<Topic> set = listenerTopics.get(listener);
if (set == null) {
set = new CopyOnWriteArraySet<>();
listenerTopics.put(listener, set);
}
set.addAll(topics);
for (Topic topic : topics) {
ByteArrayWrapper holder = new ByteArrayWrapper(serialize(topic));
ByteArrayWrapper serializedTopic = new ByteArrayWrapper(serialize(topic));
if (topic instanceof ChannelTopic) {
Collection<MessageListener> collection = channelMapping.get(holder);
if (collection == null) {
collection = new CopyOnWriteArraySet<>();
channelMapping.put(holder, collection);
}
Collection<MessageListener> collection = resolveMessageListeners(this.channelMapping, serializedTopic);
collection.add(listener);
channels.add(holder.getArray());
if (trace)
logger.trace("Adding listener '" + listener + "' on channel '" + topic.getTopic() + "'");
channels.add(serializedTopic.getArray());
logTrace(() -> "Adding listener '" + listener + "' on channel '" + topic.getTopic() + "'");
}
else if (topic instanceof PatternTopic) {
Collection<MessageListener> collection = patternMapping.get(holder);
if (collection == null) {
collection = new CopyOnWriteArraySet<>();
patternMapping.put(holder, collection);
}
Collection<MessageListener> collection = resolveMessageListeners(this.patternMapping, serializedTopic);
collection.add(listener);
patterns.add(holder.getArray());
if (trace)
logger.trace("Adding listener '" + listener + "' for pattern '" + topic.getTopic() + "'");
patterns.add(serializedTopic.getArray());
logTrace(() -> "Adding listener '" + listener + "' for pattern '" + topic.getTopic() + "'");
}
else {
throw new IllegalArgumentException("Unknown topic type '" + topic.getClass() + "'");
}
@@ -665,10 +645,12 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
boolean wasListening = isListening();
if (isRunning()) {
lazyListen();
// check the current listening state
if (wasListening) {
CompletableFuture<Void> future = new CompletableFuture<>();
getRequiredSubscriber().addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns,
@@ -678,18 +660,51 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
try {
future.join();
} catch (CompletionException e) {
} catch (CompletionException cause) {
if (e.getCause() instanceof DataAccessException) {
throw new RedisListenerExecutionFailedException(e.getMessage(), e.getCause());
if (cause.getCause() instanceof DataAccessException) {
throw new RedisListenerExecutionFailedException(cause.getMessage(), cause.getCause());
}
throw e;
throw cause;
}
}
}
}
private void logDebug(Supplier<String> message) {
if (this.logger.isDebugEnabled()) {
this.logger.debug(message.get());
}
}
private void logTrace(Supplier<String> message) {
if (this.logger.isTraceEnabled()) {
this.logger.trace(message.get());
}
}
private SynchronizingMessageListener.SubscriptionSynchronization newSubscriptionSynchronization(
Collection<byte[]> patterns, Collection<byte[]> channels, Runnable doneCallback) {
return new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels, doneCallback);
}
private Collection<MessageListener> resolveMessageListeners(
Map<ByteArrayWrapper, Collection<MessageListener>> mapping, ByteArrayWrapper topic) {
Collection<MessageListener> messageListeners = mapping.get(topic);
if (messageListeners == null) {
messageListeners = new CopyOnWriteArraySet<>();
mapping.put(topic, messageListeners);
}
return messageListeners;
}
private void removeListener(@Nullable MessageListener listener, Collection<? extends Topic> topics) {
Assert.notNull(topics, "Topics must not be null");
@@ -725,14 +740,14 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
}
for (Topic topic : topics) {
ByteArrayWrapper holder = new ByteArrayWrapper(serialize(topic));
if (topic instanceof ChannelTopic) {
remove(listener, topic, holder, channelMapping, channelsToRemove);
if (trace) {
String msg = "listener '" + listener + "'";
logger.trace("Removing " + msg + " from channel '" + topic.getTopic() + "'");
logger.trace("Removing listener '" + listener + "' from channel '" + topic.getTopic() + "'");
}
}
@@ -740,8 +755,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
remove(listener, topic, holder, patternMapping, patternsToRemove);
if (trace) {
String msg = "listener '" + listener + "'";
logger.trace("Removing " + msg + " from pattern '" + topic.getTopic() + "'");
logger.trace("Removing listener '" + listener + "' from pattern '" + topic.getTopic() + "'");
}
}
}
@@ -779,6 +793,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
listenerTopics.remove(messageListener);
}
}
// if we removed everything, remove the empty holder collection
if (listeners.isEmpty()) {
mapping.remove(holder);
@@ -807,11 +822,12 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
public void setRecoveryBackoff(BackOff recoveryInterval) {
Assert.notNull(recoveryInterval, "Recovery interval must not be null");
this.backOff = recoveryInterval;
}
public long getMaxSubscriptionRegistrationWaitingTime() {
return maxSubscriptionRegistrationWaitingTime;
return this.maxSubscriptionRegistrationWaitingTime;
}
/**
@@ -841,10 +857,11 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
* @see #handleListenerException
*/
protected void processMessage(MessageListener listener, Message message, byte[] source) {
try {
listener.onMessage(message, source);
} catch (Throwable ex) {
handleListenerException(ex);
} catch (Throwable cause) {
handleListenerException(cause);
}
}
@@ -853,31 +870,33 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
* <p>
* The default implementation logs the exception at error level. This can be overridden in subclasses.
*
* @param ex the exception to handle
* @param cause the exception to handle
*/
protected void handleListenerException(Throwable ex) {
protected void handleListenerException(Throwable cause) {
if (isActive()) {
// Regular case: failed while active.
// Invoke ErrorHandler if available.
invokeErrorHandler(ex);
invokeErrorHandler(cause);
} else {
// Rare case: listener thread failed after container shutdown.
// Log at debug level, to avoid spamming the shutdown logger.
logger.debug("Listener exception after container shutdown", ex);
logger.debug("Listener exception after container shutdown", cause);
}
}
/**
* Invoke the registered ErrorHandler, if any. Log at error level otherwise.
*
* @param ex the uncaught error that arose during message processing.
* @param cause the uncaught error that arose during message processing.
* @see #setErrorHandler
*/
protected void invokeErrorHandler(Throwable ex) {
protected void invokeErrorHandler(Throwable cause) {
if (this.errorHandler != null) {
this.errorHandler.handleError(ex);
this.errorHandler.handleError(cause);
} else if (logger.isWarnEnabled()) {
logger.warn("Execution of message listener failed, and no ErrorHandler has been set", ex);
logger.warn("Execution of message listener failed, and no ErrorHandler has been set", cause);
}
}
@@ -924,6 +943,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
if (isRunning()) { // log only if the container is still running to prevent close errors from logging
logger.error("SubscriptionTask aborted with exception:", ex);
}
future.completeExceptionally(ex);
}
@@ -939,7 +959,6 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
}
try {
if (subscriptionExecutor instanceof ScheduledExecutorService) {
((ScheduledExecutorService) subscriptionExecutor).schedule(retryRunnable, recoveryInterval,
TimeUnit.MILLISECONDS);
@@ -961,6 +980,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
}
private <T> void propagate(@Nullable T value, @Nullable Throwable throwable, CompletableFuture<T> target) {
if (throwable != null) {
target.completeExceptionally(throwable);
} else {
@@ -972,8 +992,8 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
SubscriptionConsumer listenerConsumer) {
if (!CollectionUtils.isEmpty(listeners)) {
byte[] source = pattern.clone();
byte[] source = pattern.clone();
Executor executor = getRequiredTaskExecutor();
for (MessageListener messageListener : listeners) {
@@ -987,34 +1007,30 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
private void dispatchMessage(Collection<MessageListener> listeners, Message message, @Nullable byte[] pattern) {
byte[] source = (pattern != null ? pattern.clone() : message.getChannel());
Executor executor = getRequiredTaskExecutor();
for (MessageListener messageListener : listeners) {
executor.execute(() -> processMessage(messageListener, message, source));
}
}
private boolean hasTopics() {
return !channelMapping.isEmpty() || !patternMapping.isEmpty();
return !this.channelMapping.isEmpty() || !this.patternMapping.isEmpty();
}
private Subscriber getRequiredSubscriber() {
if (this.subscriber == null) {
throw new IllegalStateException(
"Subscriber not created; Configure RedisConnectionFactory to create a Subscriber");
}
Assert.state(this.subscriber != null,
"Subscriber not created; Configure RedisConnectionFactory to create a Subscriber");
return subscriber;
return this.subscriber;
}
private Executor getRequiredTaskExecutor() {
if (this.taskExecutor == null) {
throw new IllegalStateException("No executor configured");
}
Assert.state(this.taskExecutor != null, "No executor configured");
return taskExecutor;
return this.taskExecutor;
}
@SuppressWarnings("ConstantConditions")
@@ -1408,16 +1424,18 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
CompletableFuture<Void> subscriptionDone, Collection<byte[]> patterns, Collection<byte[]> channels) {
Collection<byte[]> initiallySubscribeToChannels;
if (!patterns.isEmpty() && !channels.isEmpty()) {
initiallySubscribeToChannels = Collections.emptySet();
// perform channel subscription later as the first call to (p)subscribe blocks the client
addSynchronization(
new SynchronizingMessageListener.SubscriptionSynchronization(patterns, Collections.emptySet(), () -> {
try {
subscribeChannel(channels.toArray(new byte[0][]));
} catch (Exception e) {
handleSubscriptionException(subscriptionDone, backOffExecution, e);
} catch (Exception cause) {
handleSubscriptionException(subscriptionDone, backOffExecution, cause);
}
}));
} else {
@@ -1433,11 +1451,10 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
doSubscribe(connection, patterns, initiallySubscribeToChannels);
closeConnection();
unsubscribeFuture.complete(null);
} catch (Throwable t) {
handleSubscriptionException(subscriptionDone, backOffExecution, t);
} catch (Throwable cause) {
handleSubscriptionException(subscriptionDone, backOffExecution, cause);
}
});
}
}
}