Polishing.

Replace topic label with assertion message. Reorder methods to align with Spring style.

See #2662
Original pull request: #2663
This commit is contained in:
Mark Paluch
2023-08-08 10:19:30 +02:00
parent 979fb6ac6d
commit 6207a4f771
4 changed files with 191 additions and 205 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2017-2023 the original author or authors.
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,22 +16,19 @@
package org.springframework.data.redis.listener;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
/**
* Abstract base class for defining {@link Topic Topics}.
*
* @author John Blum
* @see org.springframework.data.redis.listener.Topic
* @since 3.2.0
* @since 3.1.3
*/
abstract class AbstractTopic implements Topic {
private final String name;
AbstractTopic(String label, String name) {
Assert.notNull(name,() -> label + " must not be null");
AbstractTopic(String name) {
this.name = name;
}

View File

@@ -15,6 +15,8 @@
*/
package org.springframework.data.redis.listener;
import org.springframework.util.Assert;
/**
* {@link Topic Channel Topic} implementation mapping to a Redis channel.
*
@@ -24,23 +26,25 @@ package org.springframework.data.redis.listener;
*/
public class ChannelTopic extends AbstractTopic {
/**
* Constructs a new {@link ChannelTopic} instance.
*
* @param channelName must not be {@literal null}.
*/
public ChannelTopic(String channelName) {
super(channelName);
Assert.notNull(channelName, "Channel name must not be null");
}
/**
* Create a new {@link ChannelTopic} for channel subscriptions.
*
* @param channelName {@link String name} of the Redis channel; must not be {@literal null} or {@literal empty}.
* @param channelName {@link String name} of the Redis channel; must not be {@literal null}.
* @return the {@link ChannelTopic} for the given {@code channelName}.
* @since 2.1
*/
public static ChannelTopic of(String channelName) {
return new ChannelTopic(channelName);
}
/**
* Constructs a new {@link ChannelTopic} instance.
*
* @param channelName must not be {@literal null}.
*/
public ChannelTopic(String channelName) {
super("Topic name", channelName);
}
}

View File

@@ -15,6 +15,8 @@
*/
package org.springframework.data.redis.listener;
import org.springframework.util.Assert;
/**
* {@link Topic} {@link String pattern} matching multiple Redis channels.
*
@@ -24,23 +26,25 @@ package org.springframework.data.redis.listener;
*/
public class PatternTopic extends AbstractTopic {
/**
* Constructs a new {@link PatternTopic} instance.
*
* @param channelPattern must not be {@literal null}.
*/
public PatternTopic(String channelPattern) {
super(channelPattern);
Assert.notNull(channelPattern, "Pattern must not be null");
}
/**
* Create a new {@link PatternTopic} for channel subscriptions based on a {@code pattern}.
*
* @param pattern {@link String pattern} used to match channels; must not be {@literal null} or {@literal empty}.
* @param pattern {@link String pattern} used to match channels; must not be {@literal null} or empty.
* @return the {@link PatternTopic} for the given {@code pattern}.
* @since 2.1
*/
public static PatternTopic of(String pattern) {
return new PatternTopic(pattern);
}
/**
* Constructs a new {@link PatternTopic} instance.
*
* @param channelPattern must not be {@literal null}.
*/
public PatternTopic(String channelPattern) {
super("Pattern", channelPattern);
}
}

View File

@@ -38,7 +38,6 @@ import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
@@ -119,7 +118,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
* Default thread name prefix: "RedisListeningContainer-".
*/
public static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(RedisMessageListenerContainer.class)
+ "-";
+ "-";
/** Logger available to subclasses */
protected final Log logger = LogFactory.getLog(getClass());
@@ -132,28 +131,25 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
private @Nullable RedisConnectionFactory connectionFactory;
private RedisSerializer<String> serializer = RedisSerializer.string();
private long maxSubscriptionRegistrationWaitingTime = DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME;
private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS);
private @Nullable String beanName;
private @Nullable Subscriber subscriber;
private final AtomicBoolean started = new AtomicBoolean();
// whether the container is running (or not)
private final AtomicReference<State> state = new AtomicReference<>(State.notListening());
// whether the container has been initialized via afterPropertiesSet
private boolean afterPropertiesSet = false;
// whether the TaskExecutor was created by the container
private boolean manageExecutor = false;
private long maxSubscriptionRegistrationWaitingTime = DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME;
private @Nullable Subscriber subscriber;
private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS);
private final AtomicBoolean started = new AtomicBoolean();
private volatile CompletableFuture<Void> listenFuture = new CompletableFuture<>();
private volatile CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
// whether the container is running (or not)
private final AtomicReference<State> state = new AtomicReference<>(State.notListening());
// Lookup maps; to avoid creation of hashes for each message, the maps use raw byte arrays (wrapped to respect
// the equals/hashcode contract)
@@ -165,7 +161,131 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
// lookup map between listeners and channels
private final Map<MessageListener, Set<Topic>> listenerTopics = new ConcurrentHashMap<>();
private volatile RedisSerializer<String> serializer = RedisSerializer.string();
private volatile CompletableFuture<Void> listenFuture = new CompletableFuture<>();
private volatile CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
/**
* Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default,
* there will be <b>no</b> ErrorHandler so that error-level logging is the only result.
*/
public void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}
/**
* Sets the task execution used for subscribing to Redis channels. By default, if no executor is set, the
* {@link #setTaskExecutor(Executor)} will be used. In some cases, this might be undesired as the listening to the
* connection is a long-running task.
* <p>
* Note: This implementation uses at most one long-running thread (depending on whether there are any listeners
* registered or not) and up to two threads during the initial registration.
*
* @param subscriptionExecutor the subscriptionExecutor to set.
*/
public void setSubscriptionExecutor(Executor subscriptionExecutor) {
this.subscriptionExecutor = subscriptionExecutor;
}
/**
* Sets the task executor used for running the message listeners when messages are received. If no task executor is
* set, an instance of {@link SimpleAsyncTaskExecutor} will be used by default. The task executor can be adjusted
* depending on the work done by the listeners and the number of messages coming in.
*
* @param taskExecutor the taskExecutor to set.
*/
public void setTaskExecutor(Executor taskExecutor) {
this.taskExecutor = taskExecutor;
}
/**
* Returns the connectionFactory.
*
* @return Returns the connectionFactory
*/
@Nullable
public RedisConnectionFactory getConnectionFactory() {
return this.connectionFactory;
}
/**
* @param connectionFactory The connectionFactory to set.
*/
public void setConnectionFactory(RedisConnectionFactory connectionFactory) {
Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
this.connectionFactory = connectionFactory;
}
/**
* Sets the serializer for converting the {@link Topic}s into low-level channels and patterns. By default,
* {@link StringRedisSerializer} is used.
*
* @param serializer The serializer to set.
*/
public void setTopicSerializer(RedisSerializer<String> serializer) {
this.serializer = serializer;
}
public long getMaxSubscriptionRegistrationWaitingTime() {
return this.maxSubscriptionRegistrationWaitingTime;
}
/**
* Specify the max time to wait for subscription registrations, in <strong>milliseconds</strong> The default is
* {@code 2000ms}, that is, 2 second. The timeout applies for awaiting the subscription registration. Note that
* subscriptions can be created asynchronously and an expired timeout does not cancel the timeout.
*
* @param maxSubscriptionRegistrationWaitingTime the maximum subscription registration wait time
* @see #DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME
* @see #start()
*/
public void setMaxSubscriptionRegistrationWaitingTime(long maxSubscriptionRegistrationWaitingTime) {
this.maxSubscriptionRegistrationWaitingTime = maxSubscriptionRegistrationWaitingTime;
}
/**
* Specify the interval between recovery attempts, in <b>milliseconds</b>. The default is 5000 ms, that is, 5 seconds.
*
* @see #handleSubscriptionException
* @see #setRecoveryBackoff(BackOff)
*/
public void setRecoveryInterval(long recoveryInterval) {
setRecoveryBackoff(new FixedBackOff(recoveryInterval, FixedBackOff.UNLIMITED_ATTEMPTS));
}
/**
* Specify the interval {@link BackOff} recovery attempts.
*
* @see #handleSubscriptionException
* @see #setRecoveryInterval(long)
* @since 2.7
*/
public void setRecoveryBackoff(BackOff recoveryInterval) {
Assert.notNull(recoveryInterval, "Recovery interval must not be null");
this.backOff = recoveryInterval;
}
/**
* Attaches the given listeners (and their topics) to the container.
* <p>
* Note: it's possible to call this method while the container is running forcing a reinitialization of the container.
* Note however that this might cause some messages to be lost (while the container reinitializes) - hence calling
* this method at runtime is considered advanced usage.
*
* @param listeners map of message listeners and their associated topics
*/
public void setMessageListeners(Map<? extends MessageListener, Collection<? extends Topic>> listeners) {
initMapping(listeners);
}
@Override
public void setBeanName(String name) {
this.beanName = name;
}
@Override
public void afterPropertiesSet() {
@@ -417,8 +537,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
future.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
} catch (InterruptedException cause) {
Thread.currentThread().interrupt();
} catch (ExecutionException | TimeoutException ignore) {
}
} catch (ExecutionException | TimeoutException ignore) {}
}
@Override
@@ -437,87 +556,6 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
return this.afterPropertiesSet;
}
/**
* Returns the connectionFactory.
*
* @return Returns the connectionFactory
*/
@Nullable
public RedisConnectionFactory getConnectionFactory() {
return this.connectionFactory;
}
/**
* @param connectionFactory The connectionFactory to set.
*/
public void setConnectionFactory(RedisConnectionFactory connectionFactory) {
Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
this.connectionFactory = connectionFactory;
}
@Override
public void setBeanName(String name) {
this.beanName = name;
}
/**
* Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default
* there will be <b>no</b> ErrorHandler so that error-level logging is the only result.
*/
public void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}
/**
* Attaches the given listeners (and their topics) to the container.
* <p>
* Note: it's possible to call this method while the container is running forcing a reinitialization of the container.
* Note however that this might cause some messages to be lost (while the container reinitializes) - hence calling
* this method at runtime is considered advanced usage.
*
* @param listeners map of message listeners and their associated topics
*/
public void setMessageListeners(Map<? extends MessageListener, Collection<? extends Topic>> listeners) {
initMapping(listeners);
}
/**
* Sets the task execution used for subscribing to Redis channels. By default, if no executor is set, the
* {@link #setTaskExecutor(Executor)} will be used. In some cases, this might be undersired as the listening to the
* connection is a long running task.
* <p>
* Note: This implementation uses at most one long running thread (depending on whether there are any listeners
* registered or not) and up to two threads during the initial registration.
*
* @param subscriptionExecutor The subscriptionExecutor to set.
*/
public void setSubscriptionExecutor(Executor subscriptionExecutor) {
this.subscriptionExecutor = subscriptionExecutor;
}
/**
* Sets the task executor used for running the message listeners when messages are received. If no task executor is
* set, an instance of {@link SimpleAsyncTaskExecutor} will be used by default. The task executor can be adjusted
* depending on the work done by the listeners and the number of messages coming in.
*
* @param taskExecutor The taskExecutor to set.
*/
public void setTaskExecutor(Executor taskExecutor) {
this.taskExecutor = taskExecutor;
}
/**
* Sets the serializer for converting the {@link Topic}s into low-level channels and patterns. By default,
* {@link StringRedisSerializer} is used.
*
* @param serializer The serializer to set.
*/
public void setTopicSerializer(RedisSerializer<String> serializer) {
this.serializer = serializer;
}
/**
* Adds a message listener to the (potentially running) container. If the container is running, the listener starts
* receiving (matching) messages as soon as possible.
@@ -630,16 +668,14 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
Collection<MessageListener> collection = resolveMessageListeners(this.channelMapping, serializedTopic);
collection.add(listener);
channels.add(serializedTopic.getArray());
logTrace(() -> "Adding listener '" + listener + "' on channel '" + topic.getTopic() + "'");
}
else if (topic instanceof PatternTopic) {
logTrace(() -> String.format("Adding listener '%s' on channel '%s'", listener, topic.getTopic()));
} else if (topic instanceof PatternTopic) {
Collection<MessageListener> collection = resolveMessageListeners(this.patternMapping, serializedTopic);
collection.add(listener);
patterns.add(serializedTopic.getArray());
logTrace(() -> "Adding listener '" + listener + "' for pattern '" + topic.getTopic() + "'");
}
else {
throw new IllegalArgumentException("Unknown topic type '" + topic.getClass() + "'");
logTrace(() -> String.format("Adding listener '%s' for pattern '%s'", listener, topic.getTopic()));
} else {
throw new IllegalArgumentException(String.format("Unknown topic type '%s'", topic.getClass()));
}
}
boolean wasListening = isListening();
@@ -653,8 +689,8 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
CompletableFuture<Void> future = new CompletableFuture<>();
getRequiredSubscriber().addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns,
channels, () -> future.complete(null)));
getRequiredSubscriber().addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(
patterns, channels, () -> future.complete(null)));
getRequiredSubscriber().subscribeChannel(channels.toArray(new byte[channels.size()][]));
getRequiredSubscriber().subscribePattern(patterns.toArray(new byte[patterns.size()][]));
@@ -672,26 +708,6 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
}
}
private void logDebug(Supplier<String> message) {
if (this.logger.isDebugEnabled()) {
this.logger.debug(message.get());
}
}
private void logTrace(Supplier<String> message) {
if (this.logger.isTraceEnabled()) {
this.logger.trace(message.get());
}
}
private SynchronizingMessageListener.SubscriptionSynchronization newSubscriptionSynchronization(
Collection<byte[]> patterns, Collection<byte[]> channels, Runnable doneCallback) {
return new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels, doneCallback);
}
private Collection<MessageListener> resolveMessageListeners(
Map<ByteArrayWrapper, Collection<MessageListener>> mapping, ByteArrayWrapper topic) {
@@ -718,8 +734,6 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
topics = listenerTopics.get(listener);
}
boolean trace = logger.isTraceEnabled();
// check stop listening case
if (CollectionUtils.isEmpty(topics)) {
stopListening();
@@ -745,18 +759,12 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
if (topic instanceof ChannelTopic) {
remove(listener, topic, holder, channelMapping, channelsToRemove);
if (trace) {
logger.trace("Removing listener '" + listener + "' from channel '" + topic.getTopic() + "'");
}
logTrace(() -> String.format("Removing listener '%s' from channel '%s'", listener, topic.getTopic()));
}
else if (topic instanceof PatternTopic) {
remove(listener, topic, holder, patternMapping, patternsToRemove);
if (trace) {
logger.trace("Removing listener '" + listener + "' from pattern '" + topic.getTopic() + "'");
}
logTrace(() -> String.format("Removing listener '%s' from pattern '%s'", listener, topic.getTopic()));
}
}
@@ -772,7 +780,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
}
}
private void remove(MessageListener listener, Topic topic, ByteArrayWrapper holder,
private void remove(@Nullable MessageListener listener, Topic topic, ByteArrayWrapper holder,
Map<ByteArrayWrapper, Collection<MessageListener>> mapping, List<byte[]> topicToRemove) {
Collection<MessageListener> listeners = mapping.get(holder);
@@ -802,47 +810,6 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
}
}
/**
* Specify the interval between recovery attempts, in <b>milliseconds</b>. The default is 5000 ms, that is, 5 seconds.
*
* @see #handleSubscriptionException
* @see #setRecoveryBackoff(BackOff)
*/
public void setRecoveryInterval(long recoveryInterval) {
setRecoveryBackoff(new FixedBackOff(recoveryInterval, FixedBackOff.UNLIMITED_ATTEMPTS));
}
/**
* Specify the interval {@link BackOff} recovery attempts.
*
* @see #handleSubscriptionException
* @see #setRecoveryInterval(long)
* @since 2.7
*/
public void setRecoveryBackoff(BackOff recoveryInterval) {
Assert.notNull(recoveryInterval, "Recovery interval must not be null");
this.backOff = recoveryInterval;
}
public long getMaxSubscriptionRegistrationWaitingTime() {
return this.maxSubscriptionRegistrationWaitingTime;
}
/**
* Specify the max time to wait for subscription registrations, in <strong>milliseconds</strong> The default is
* {@code 2000ms}, that is, 2 second. The timeout applies for awaiting the subscription registration. Note that
* subscriptions can be created asynchronously and an expired timeout does not cancel the timeout.
*
* @param maxSubscriptionRegistrationWaitingTime the maximum subscription registration wait time
* @see #DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME
* @see #start()
*/
public void setMaxSubscriptionRegistrationWaitingTime(long maxSubscriptionRegistrationWaitingTime) {
this.maxSubscriptionRegistrationWaitingTime = maxSubscriptionRegistrationWaitingTime;
}
private Subscriber createSubscriber(RedisConnectionFactory connectionFactory, Executor executor) {
return ConnectionUtils.isAsync(connectionFactory) ? new Subscriber(connectionFactory)
: new BlockingSubscriber(connectionFactory, executor);
@@ -1038,6 +1005,20 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
return serializer.serialize(topic.getTopic());
}
private void logDebug(Supplier<String> message) {
if (this.logger.isDebugEnabled()) {
this.logger.debug(message.get());
}
}
private void logTrace(Supplier<String> message) {
if (this.logger.isTraceEnabled()) {
this.logger.trace(message.get());
}
}
/**
* Represents an operation that accepts three input arguments {@link SubscriptionListener},
* {@code channel or pattern}, and {@code count} and returns no result.