Closes #2690
Original pull request: #2691
This commit is contained in:
John Blum
2023-09-12 17:48:13 -07:00
parent ec4370f060
commit 75eed8f11a
8 changed files with 302 additions and 239 deletions

View File

@@ -69,15 +69,15 @@ public class RedisCache extends AbstractValueAdaptingCache {
private final String name;
/**
* Create a new {@link RedisCache}.
* Create a new {@link RedisCache} with the given {@link String name}.
*
* @param name {@link String name} for this {@link Cache}; must not be {@literal null}.
* @param cacheWriter {@link RedisCacheWriter} used to perform {@link RedisCache} operations by executing the
* necessary Redis commands; must not be {@literal null}.
* @param cacheConfiguration {@link RedisCacheConfiguration} applied to this {@link RedisCache} on creation; must not
* be {@literal null}.
* @param cacheWriter {@link RedisCacheWriter} used to perform {@link RedisCache} operations by
* executing the necessary Redis commands; must not be {@literal null}.
* @param cacheConfiguration {@link RedisCacheConfiguration} applied to this {@link RedisCache} on creation;
* must not be {@literal null}.
* @throws IllegalArgumentException if either the given {@link RedisCacheWriter} or {@link RedisCacheConfiguration}
* are {@literal null} or the given {@link String} name for this {@link RedisCache} is {@literal null}.
* are {@literal null} or the given {@link String} name for this {@link RedisCache} is {@literal null}.
*/
protected RedisCache(String name, RedisCacheWriter cacheWriter, RedisCacheConfiguration cacheConfiguration) {
@@ -160,15 +160,14 @@ public class RedisCache extends AbstractValueAdaptingCache {
private <T> T getSynchronized(Object key, Callable<T> valueLoader) {
lock.lock();
try {
ValueWrapper result = get(key);
return result != null ? (T) result.get() : loadCacheValue(key, valueLoader);
} finally {
lock.unlock();
}
}
/**
@@ -422,6 +421,7 @@ public class RedisCache extends AbstractValueAdaptingCache {
target.append("}");
return target.toString();
} else if (source.isCollection() || source.isArray()) {
StringJoiner stringJoiner = new StringJoiner(",");

View File

@@ -29,7 +29,7 @@ import org.springframework.dao.DataAccessException;
* The methods follow as much as possible the Redis names and conventions.
* <p>
* {@link RedisConnection Redis connections}, unlike perhaps their underlying native connection are not Thread-safe and
* should not be shared across multiple threads.
* should not be shared across multiple threads, concurrently or simultaneously.
*
* @author Costin Leau
* @author Christoph Strobl

View File

@@ -805,12 +805,14 @@ public class JedisClusterConnection implements RedisClusterConnection {
*/
public static class JedisClusterTopologyProvider implements ClusterTopologyProvider {
private final Object lock = new Object();
private final JedisCluster cluster;
private final long cacheTimeMs;
private long time = 0;
private final long cacheTimeMs;
private @Nullable ClusterTopology cached;
private final JedisCluster cluster;
/**
* Create new {@link JedisClusterTopologyProvider}. Uses a default cache timeout of 100 milliseconds.
*
@@ -847,6 +849,7 @@ public class JedisClusterConnection implements RedisClusterConnection {
Map<String, Exception> errors = new LinkedHashMap<>();
List<Entry<String, ConnectionPool>> list = new ArrayList<>(cluster.getClusterNodes().entrySet());
Collections.shuffle(list);
for (Entry<String, ConnectionPool> entry : list) {
@@ -854,25 +857,26 @@ public class JedisClusterConnection implements RedisClusterConnection {
try (Connection connection = entry.getValue().getResource()) {
time = System.currentTimeMillis();
Set<RedisClusterNode> nodes = Converters.toSetOfRedisClusterNodes(new Jedis(connection).clusterNodes());
synchronized (lock) {
cached = new ClusterTopology(nodes);
}
cached = new ClusterTopology(nodes);
return cached;
} catch (Exception ex) {
errors.put(entry.getKey(), ex);
} catch (Exception cause) {
errors.put(entry.getKey(), cause);
}
}
StringBuilder sb = new StringBuilder();
StringBuilder stringBuilder = new StringBuilder();
for (Entry<String, Exception> entry : errors.entrySet()) {
sb.append(String.format("\r\n\t- %s failed: %s", entry.getKey(), entry.getValue().getMessage()));
stringBuilder.append(String.format("\r\n\t- %s failed: %s", entry.getKey(), entry.getValue().getMessage()));
}
throw new ClusterStateFailureException(
"Could not retrieve cluster information; CLUSTER NODES returned with error" + sb.toString());
"Could not retrieve cluster information; CLUSTER NODES returned with error" + stringBuilder);
}
/**

View File

@@ -33,22 +33,26 @@ import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Connection provider for Cluster connections.
* {@link LettuceConnectionProvider} and {@link RedisClientProvider} for Redis Cluster connections.
*
* @author Mark Paluch
* @author Christoph Strobl
* @author Bruce Cloud
* @author John Blum
* @since 2.0
*/
class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClientProvider {
private final RedisClusterClient client;
private final RedisCodec<?, ?> codec;
private final Optional<ReadFrom> readFrom;
private volatile boolean initialized;
private final Lock lock = new ReentrantLock();
private volatile boolean initialized;
@Nullable
private final ReadFrom readFrom;
private final RedisClusterClient client;
private final RedisCodec<?, ?> codec;
/**
* Create new {@link ClusterConnectionProvider}.
@@ -75,7 +79,11 @@ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClien
this.client = client;
this.codec = codec;
this.readFrom = Optional.ofNullable(readFrom);
this.readFrom = readFrom;
}
private Optional<ReadFrom> getReadFrom() {
return Optional.ofNullable(this.readFrom);
}
@Override
@@ -83,10 +91,10 @@ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClien
if (!initialized) {
// partitions have to be initialized before asynchronous usage.
// Needs to happen only once. Initialize eagerly if
// blocking is not an options.
// Partitions have to be initialized before asynchronous usage.
// Needs to happen only once. Initialize eagerly if blocking is not an options.
lock.lock();
try {
if (!initialized) {
client.getPartitions();
@@ -100,27 +108,25 @@ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClien
if (connectionType.equals(StatefulRedisPubSubConnection.class)
|| connectionType.equals(StatefulRedisClusterPubSubConnection.class)) {
return client.connectPubSubAsync(codec) //
.thenApply(connectionType::cast);
return client.connectPubSubAsync(codec).thenApply(connectionType::cast);
}
if (StatefulRedisClusterConnection.class.isAssignableFrom(connectionType)
|| connectionType.equals(StatefulConnection.class)) {
return client.connectAsync(codec) //
.thenApply(connection -> {
readFrom.ifPresent(connection::setReadFrom);
return client.connectAsync(codec).thenApply(connection -> {
getReadFrom().ifPresent(connection::setReadFrom);
return connectionType.cast(connection);
});
}
return LettuceFutureUtils
.failed(new InvalidDataAccessApiUsageException("Connection type " + connectionType + " not supported"));
String message = String.format("Connection type %s not supported", connectionType);
return LettuceFutureUtils.failed(new InvalidDataAccessApiUsageException(message));
}
@Override
public RedisClusterClient getRedisClient() {
return client;
return this.client;
}
}

View File

@@ -554,7 +554,9 @@ public class LettuceClusterConnection extends LettuceConnection
static class LettuceClusterNodeResourceProvider implements ClusterNodeResourceProvider, DisposableBean {
private final Lock lock = new ReentrantLock();
private final LettuceConnectionProvider connectionProvider;
private volatile @Nullable StatefulRedisClusterConnection<byte[], byte[]> connection;
LettuceClusterNodeResourceProvider(LettuceConnectionProvider connectionProvider) {
@@ -567,14 +569,16 @@ public class LettuceClusterConnection extends LettuceConnection
Assert.notNull(node, "Node must not be null");
if (connection == null) {
lock.lock();
if (this.connection == null) {
this.lock.lock();
try {
if (connection == null) {
this.connection = connectionProvider.getConnection(StatefulRedisClusterConnection.class);
if (this.connection == null) {
this.connection = this.connectionProvider.getConnection(StatefulRedisClusterConnection.class);
}
} finally {
lock.unlock();
this.lock.unlock();
}
}
@@ -586,6 +590,7 @@ public class LettuceClusterConnection extends LettuceConnection
@Override
public void destroy() throws Exception {
if (this.connection != null) {
this.connectionProvider.release(this.connection);
}

View File

@@ -15,7 +15,8 @@
*/
package org.springframework.data.redis.connection.lettuce;
import static org.springframework.data.redis.connection.lettuce.LettuceConnection.*;
import static org.springframework.data.redis.connection.lettuce.LettuceConnection.CODEC;
import static org.springframework.data.redis.connection.lettuce.LettuceConnection.PipeliningFlushPolicy;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ClientOptions;
@@ -44,6 +45,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
@@ -57,10 +59,24 @@ import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.ExceptionTranslationStrategy;
import org.springframework.data.redis.PassThroughExceptionTranslationStrategy;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.ClusterCommandExecutor;
import org.springframework.data.redis.connection.ClusterTopologyProvider;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisConfiguration;
import org.springframework.data.redis.connection.RedisConfiguration.ClusterConfiguration;
import org.springframework.data.redis.connection.RedisConfiguration.WithDatabaseIndex;
import org.springframework.data.redis.connection.RedisConfiguration.WithPassword;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisSentinelConfiguration;
import org.springframework.data.redis.connection.RedisSentinelConnection;
import org.springframework.data.redis.connection.RedisSocketConfiguration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.RedisStaticMasterReplicaConfiguration;
import org.springframework.data.redis.util.RedisAssertions;
import org.springframework.data.util.Optionals;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@@ -69,7 +85,7 @@ import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/**
* Connection factory creating <a href="https://github.com/mp911de/lettuce">Lettuce</a>-based connections.
* {@link RedisConnectionFactory Connection factory} creating <a href="https://lettuce.io/">Lettuce</a>-based connections.
* <p>
* This factory creates a new {@link LettuceConnection} on each call to {@link #getConnection()}. While multiple
* {@link LettuceConnection}s share a single thread-safe native connection by default, {@link LettuceConnection} and its
@@ -109,6 +125,7 @@ import org.springframework.util.StringUtils;
* @author Luis De Bello
* @author Andrea Como
* @author Chris Bono
* @author John Blum
*/
public class LettuceConnectionFactory implements RedisConnectionFactory, ReactiveRedisConnectionFactory,
InitializingBean, DisposableBean, SmartLifecycle {
@@ -116,10 +133,10 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy(
LettuceExceptionConverter.INSTANCE);
private boolean validateConnection = false;
private boolean shareNativeConnection = true;
private boolean eagerInitialization = false;
private boolean convertPipelineAndTxResults = true;
private boolean eagerInitialization = false;
private boolean shareNativeConnection = true;
private boolean validateConnection = false;
private int phase = 0; // in between min and max values
@@ -138,9 +155,6 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
private final Log log = LogFactory.getLog(getClass());
/** Synchronization monitor for the shared Connection */
private final Object connectionMonitor = new Object();
private final Lock lock = new ReentrantLock();
private PipeliningFlushPolicy pipeliningFlushPolicy = PipeliningFlushPolicy.flushEachCommand();
@@ -349,9 +363,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
ClusterCommandExecutor getRequiredClusterCommandExecutor() {
if (this.clusterCommandExecutor == null) {
throw new IllegalStateException("ClusterCommandExecutor not initialized");
}
Assert.state(this.clusterCommandExecutor != null, "ClusterCommandExecutor not initialized");
return this.clusterCommandExecutor;
}
@@ -659,11 +671,8 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
*/
public AbstractRedisClient getRequiredNativeClient() {
AbstractRedisClient client = getNativeClient();
Assert.state(client != null, "Client not yet initialized; Did you forget to call initialize the bean");
return client;
return RedisAssertions.requireState(getNativeClient(),
"Client not yet initialized; Did you forget to call initialize the bean");
}
@Nullable
@@ -823,7 +832,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
* @since 2.1
*/
private boolean isStaticMasterReplicaAware() {
return RedisConfiguration.isStaticMasterReplicaConfiguration(configuration);
return RedisConfiguration.isStaticMasterReplicaConfiguration(this.configuration);
}
/**
@@ -831,7 +840,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
* @since 1.5
*/
public boolean isRedisSentinelAware() {
return RedisConfiguration.isSentinelConfiguration(configuration);
return RedisConfiguration.isSentinelConfiguration(this.configuration);
}
/**
@@ -839,7 +848,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
* @since 2.1
*/
private boolean isDomainSocketAware() {
return RedisConfiguration.isDomainSocketConfiguration(configuration);
return RedisConfiguration.isDomainSocketConfiguration(this.configuration);
}
/**
@@ -847,7 +856,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
* @since 1.7
*/
public boolean isClusterAware() {
return RedisConfiguration.isClusterConfiguration(configuration);
return RedisConfiguration.isClusterConfiguration(this.configuration);
}
@Override
@@ -858,12 +867,15 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
if (isCreatedOrStopped(current)) {
AbstractRedisClient client = createClient();
this.client = client;
LettuceConnectionProvider connectionProvider = new ExceptionTranslatingConnectionProvider(
createConnectionProvider(this.client, CODEC));
createConnectionProvider(client, CODEC));
this.connectionProvider = connectionProvider;
this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider(
createConnectionProvider(this.client, LettuceReactiveRedisConnection.CODEC));
createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC));
if (isClusterAware()) {
this.clusterCommandExecutor = createClusterCommandExecutor((RedisClusterClient) client, connectionProvider);
@@ -903,7 +915,6 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
reactiveConnectionProvider = null;
if (client != null) {
try {
Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
Duration timeout = clientConfiguration.getShutdownTimeout();
@@ -923,7 +934,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
@Override
public int getPhase() {
return phase;
return this.phase;
}
/**
@@ -943,6 +954,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
@Override
public void afterPropertiesSet() {
if (isAutoStartup()) {
start();
}
@@ -990,8 +1002,8 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
return getClusterConnection();
}
LettuceConnection connection = doCreateLettuceConnection(getSharedConnection(), connectionProvider, getTimeout(),
getDatabase());
LettuceConnection connection =
doCreateLettuceConnection(getSharedConnection(), this.connectionProvider, getTimeout(), getDatabase());
connection.setConvertPipelineAndTxResults(this.convertPipelineAndTxResults);
@@ -1007,7 +1019,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
throw new InvalidDataAccessApiUsageException("Cluster is not configured");
}
RedisClusterClient clusterClient = (RedisClusterClient) client;
RedisClusterClient clusterClient = (RedisClusterClient) this.client;
StatefulRedisClusterConnection<byte[], byte[]> sharedConnection = getSharedClusterConnection();
@@ -1017,6 +1029,14 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
getRequiredClusterCommandExecutor(), this.clientConfiguration.getCommandTimeout());
}
@Override
public RedisSentinelConnection getSentinelConnection() {
assertStarted();
return new LettuceSentinelConnection(this.connectionProvider);
}
/**
* Customization hook for {@link LettuceConnection} creation.
*
@@ -1118,14 +1138,14 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
*/
public void resetConnection() {
Optionals.toStream(Optional.ofNullable(connection), Optional.ofNullable(reactiveConnection))
.forEach(SharedConnection::resetConnection);
doInLock(() -> {
synchronized (this.connectionMonitor) {
Optionals.toStream(Optional.ofNullable(this.connection), Optional.ofNullable(this.reactiveConnection))
.forEach(SharedConnection::resetConnection);
this.connection = null;
this.reactiveConnection = null;
}
});
}
/**
@@ -1141,26 +1161,26 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
private SharedConnection<byte[]> getOrCreateSharedConnection() {
synchronized (this.connectionMonitor) {
return doInLock(() -> {
if (this.connection == null) {
this.connection = new SharedConnection<>(connectionProvider);
this.connection = new SharedConnection<>(this.connectionProvider);
}
return this.connection;
}
});
}
private SharedConnection<ByteBuffer> getOrCreateSharedReactiveConnection() {
synchronized (this.connectionMonitor) {
return doInLock(() -> {
if (this.reactiveConnection == null) {
this.reactiveConnection = new SharedConnection<>(reactiveConnectionProvider);
this.reactiveConnection = new SharedConnection<>(this.reactiveConnectionProvider);
}
return this.reactiveConnection;
}
});
}
@Override
@@ -1231,7 +1251,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
return isStaticMasterReplicaAware() ? createStaticMasterReplicaConnectionProvider((RedisClient) client, codec)
: isClusterAware() ? createClusterConnectionProvider((RedisClusterClient) client, codec)
: createStandaloneConnectionProvider((RedisClient) client, codec);
: createStandaloneConnectionProvider((RedisClient) client, codec);
}
@SuppressWarnings("all")
@@ -1258,7 +1278,8 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
return isStaticMasterReplicaAware() ? createStaticMasterReplicaClient()
: isRedisSentinelAware() ? createSentinelClient()
: isClusterAware() ? createClusterClient() : createBasicClient();
: isClusterAware() ? createClusterClient()
: createBasicClient();
}
private RedisClient createStaticMasterReplicaClient() {
@@ -1324,7 +1345,8 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
ClusterConfiguration clusterConfiguration = (ClusterConfiguration) this.configuration;
clusterConfiguration.getClusterNodes().stream()
.map(node -> createRedisURIAndApplySettings(node.getHost(), node.getPort())).forEach(initialUris::add);
.map(node -> createRedisURIAndApplySettings(node.getHost(), node.getPort()))
.forEach(initialUris::add);
RedisClusterClient clusterClient = this.clientConfiguration.getClientResources()
.map(clientResources -> RedisClusterClient.create(clientResources, initialUris))
@@ -1368,7 +1390,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
private void assertStarted() {
State current = state.get();
State current = this.state.get();
if (State.STARTED.equals(current)) {
return;
@@ -1408,16 +1430,13 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
private RedisURI createRedisSocketURIAndApplySettings(String socketPath) {
RedisURI.Builder builder = RedisURI.Builder.socket(socketPath);
applyAuthentication(builder);
builder.withDatabase(getDatabase());
builder.withTimeout(clientConfiguration.getCommandTimeout());
return builder.build();
return applyAuthentication(RedisURI.Builder.socket(socketPath))
.withTimeout(this.clientConfiguration.getCommandTimeout())
.withDatabase(getDatabase())
.build();
}
private void applyAuthentication(RedisURI.Builder builder) {
private RedisURI.Builder applyAuthentication(RedisURI.Builder builder) {
String username = getRedisUsername();
@@ -1428,16 +1447,10 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
getRedisPassword().toOptional().ifPresent(builder::withPassword);
}
clientConfiguration.getRedisCredentialsProviderFactory()
this.clientConfiguration.getRedisCredentialsProviderFactory()
.ifPresent(factory -> builder.withAuthentication(factory.createCredentialsProvider(this.configuration)));
}
@Override
public RedisSentinelConnection getSentinelConnection() {
assertStarted();
return new LettuceSentinelConnection(connectionProvider);
return builder;
}
private MutableLettuceClientConfiguration getMutableConfiguration() {
@@ -1453,22 +1466,36 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
return clientConfiguration.getCommandTimeout().toMillis();
}
private void doInLock(Runnable runnable) {
doInLock(() -> { runnable.run(); return null; });
}
private <T> T doInLock(Supplier<T> supplier) {
this.lock.lock();
try {
return supplier.get();
}
finally {
this.lock.unlock();
}
}
/**
* Wrapper for shared connections. Keeps track of the connection lifecycleThe wrapper is thread-safe as it
* Wrapper for shared connections. Keeps track of the connection lifecycle. The wrapper is Thread-safe as it
* synchronizes concurrent calls by blocking.
*
* @param <E> connection encoding.
* @author Mark Paluch
* @author Christoph Strobl
* @author Jonn Blum
* @since 2.1
*/
class SharedConnection<E> {
private final LettuceConnectionProvider connectionProvider;
/** Synchronization monitor for the shared Connection */
private final Object connectionMonitor = new Object();
private @Nullable StatefulConnection<E, E> connection;
SharedConnection(LettuceConnectionProvider connectionProvider) {
@@ -1476,16 +1503,16 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
}
/**
* Returns a valid Lettuce connection. Initializes and validates the connection if
* {@link #setValidateConnection(boolean) enabled}.
* Returns a valid Lettuce {@link StatefulConnection connection}.
* <p>
* Initializes and validates the connection if {@link #setValidateConnection(boolean) enabled}.
*
* @return the connection.
* @return the new Lettuce {@link StatefulConnection connection}.
*/
@Nullable
StatefulConnection<E, E> getConnection() {
lock.lock();
try {
return doInLock(() -> {
if (this.connection == null) {
this.connection = getNativeConnection();
@@ -1496,74 +1523,88 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
}
return this.connection;
} finally {
lock.unlock();
}
});
}
/**
* Obtain a connection from the associated {@link LettuceConnectionProvider}.
* Acquire a {@link StatefulConnection native connection} from the associated {@link LettuceConnectionProvider}.
*
* @return the connection.
* @return a new {@link StatefulConnection native connection}.
* @see org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider#getConnection(Class)
* @see io.lettuce.core.api.StatefulConnection
*/
@SuppressWarnings("unchecked")
private StatefulConnection<E, E> getNativeConnection() {
return connectionProvider.getConnection(StatefulConnection.class);
return this.connectionProvider.getConnection(StatefulConnection.class);
}
/**
* Validate the connection. Invalid connections will be closed and the connection state will be reset.
* Null-safe operation to evaluate whether the given {@link StatefulConnection connetion}
* is {@link StatefulConnection#isOpen() open}.
*
* @param connection {@link StatefulConnection} to evaluate.
* @return a boolean value indicating whether the given {@link StatefulConnection} is not {@literal null}
* and is {@link StatefulConnection#isOpen() open}.
* @see io.lettuce.core.api.StatefulConnection#isOpen()
*/
private boolean isOpen(@Nullable StatefulConnection<?, ?> connection) {
return connection != null && connection.isOpen();
}
/**
* Validate the {@link StatefulConnection connection}.
* <p>
* {@link StatefulConnection Connections} are considered valid if they can send/receive ping packets.
* Invalid {@link StatefulConnection connections} will be closed and the connection state will be reset.
*/
void validateConnection() {
lock.lock();
try {
doInLock(() -> {
StatefulConnection<?, ?> connection = this.connection;
boolean valid = false;
if (connection != null && connection.isOpen()) {
if (isOpen(connection)) {
try {
if (connection instanceof StatefulRedisConnection) {
((StatefulRedisConnection) connection).sync().ping();
if (connection instanceof StatefulRedisConnection<?, ?> statefulConnection) {
statefulConnection.sync().ping();
}
if (connection instanceof StatefulRedisClusterConnection) {
((StatefulRedisClusterConnection) connection).sync().ping();
if (connection instanceof StatefulRedisClusterConnection<?, ?> statefulClusterConnection) {
statefulClusterConnection.sync().ping();
}
valid = true;
} catch (Exception cause) {
log.debug("Validation failed", cause);
}
}
if (!valid) {
log.info("Validation of shared connection failed; Creating a new connection.");
resetConnection();
this.connection = getNativeConnection();
}
} finally {
lock.unlock();
}
});
}
/**
* Reset the underlying shared Connection, to be reinitialized on next access.
* Reset the underlying shared {@link StatefulConnection connection}, to be reinitialized on next access.
*/
void resetConnection() {
lock.lock();
try {
doInLock(() -> {
if (this.connection != null) {
this.connectionProvider.release(this.connection);
StatefulConnection<?, ?> connection = this.connection;
if (connection != null) {
this.connectionProvider.release(connection);
}
this.connection = null;
} finally {
lock.unlock();
}
});
}
}

View File

@@ -39,11 +39,13 @@ import org.springframework.util.Assert;
*/
public class DefaultRedisScript<T> implements RedisScript<T>, InitializingBean {
private @Nullable Class<T> resultType;
private final Lock lock = new ReentrantLock();
private @Nullable ScriptSource scriptSource;
private @Nullable String sha1;
private @Nullable Class<T> resultType;
/**
* Creates a new {@link DefaultRedisScript}
@@ -81,6 +83,7 @@ public class DefaultRedisScript<T> implements RedisScript<T>, InitializingBean {
public String getSha1() {
lock.lock();
try {
if (sha1 == null || scriptSource.isModified()) {
this.sha1 = DigestUtils.sha1DigestAsHex(getScriptAsString());

View File

@@ -125,34 +125,29 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
/** Logger available to subclasses */
protected final Log logger = LogFactory.getLog(getClass());
private @Nullable ErrorHandler errorHandler;
private @Nullable Executor subscriptionExecutor;
private @Nullable Executor taskExecutor;
private @Nullable RedisConnectionFactory connectionFactory;
private RedisSerializer<String> serializer = RedisSerializer.string();
private long maxSubscriptionRegistrationWaitingTime = DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME;
private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS);
private @Nullable String beanName;
// whether the container has been initialized via afterPropertiesSet
private boolean afterPropertiesSet = false;
// whether the TaskExecutor was created by the container
private boolean manageExecutor = false;
private @Nullable Subscriber subscriber;
private long maxSubscriptionRegistrationWaitingTime = DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME;
private final AtomicBoolean started = new AtomicBoolean();
// whether the container is running (or not)
private final AtomicReference<State> state = new AtomicReference<>(State.notListening());
private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS);
private volatile CompletableFuture<Void> listenFuture = new CompletableFuture<>();
private volatile CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
private @Nullable ErrorHandler errorHandler;
private @Nullable Executor subscriptionExecutor;
private @Nullable Executor taskExecutor;
// Lookup maps; to avoid creation of hashes for each message, the maps use raw byte arrays (wrapped to respect
// the equals/hashcode contract)
@@ -163,9 +158,13 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
// lookup map between listeners and channels
private final Map<MessageListener, Set<Topic>> listenerTopics = new ConcurrentHashMap<>();
private volatile CompletableFuture<Void> listenFuture = new CompletableFuture<>();
private @Nullable RedisConnectionFactory connectionFactory;
private volatile CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
private RedisSerializer<String> serializer = RedisSerializer.string();
private @Nullable String beanName;
private @Nullable Subscriber subscriber;
/**
* Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default,
@@ -323,8 +322,6 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
/**
* Destroy the container and stop it.
*
* @throws Exception
*/
@Override
public void destroy() throws Exception {
@@ -397,7 +394,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
private CompletableFuture<Void> lazyListen(BackOffExecution backOffExecution) {
if (!hasTopics()) {
logger.debug("Postpone listening for Redis messages until actual listeners are added");
logDebug(() -> "Postpone listening for Redis messages until actual listeners are added");
return CompletableFuture.completedFuture(null);
}
@@ -430,13 +427,14 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
CompletableFuture<Void> listenFuture = getRequiredSubscriber().initialize(backOffExecution,
patternMapping.keySet().stream().map(ByteArrayWrapper::getArray).collect(Collectors.toList()),
channelMapping.keySet().stream().map(ByteArrayWrapper::getArray).collect(Collectors.toList()));
listenFuture.whenComplete((unused, throwable) -> {
if (throwable == null) {
logger.debug("RedisMessageListenerContainer listeners registered successfully");
logDebug(() -> "RedisMessageListenerContainer listeners registered successfully");
this.state.set(State.listening());
} else {
logger.debug("Failed to start RedisMessageListenerContainer listeners", throwable);
logDebug(() -> "Failed to start RedisMessageListenerContainer listeners", throwable);
this.state.set(State.notListening());
}
@@ -448,7 +446,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
}
});
logger.debug("Subscribing to topics for RedisMessageListenerContainer");
logDebug(() -> "Subscribing to topics for RedisMessageListenerContainer");
return true;
}
@@ -484,18 +482,14 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
public void stop(Runnable callback) {
if (this.started.compareAndSet(true, false)) {
stopListening();
if (logger.isDebugEnabled()) {
logger.debug("Stopped RedisMessageListenerContainer");
}
logDebug(() -> "Stopped RedisMessageListenerContainer");
callback.run();
}
}
private void stopListening() {
while (!doUnsubscribe()) {
// busy-loop, allow for synchronization against doSubscribe therefore we want to retry.
}
@@ -523,9 +517,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
this.listenFuture = new CompletableFuture<>();
this.unsubscribeFuture = new CompletableFuture<>();
if (logger.isDebugEnabled()) {
logger.debug("Stopped listening");
}
logDebug(() -> "Stopped listening");
return true;
} else {
@@ -850,7 +842,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
} else {
// Rare case: listener thread failed after container shutdown.
// Log at debug level, to avoid spamming the shutdown logger.
logger.debug("Listener exception after container shutdown", cause);
logDebug(() -> "Listener exception after container shutdown", cause);
}
}
@@ -873,22 +865,23 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
* Handle subscription task exception. Will attempt to restart the subscription if the Exception is a connection
* failure (for example, Redis was restarted).
*
* @param ex Throwable exception
* @param cause Throwable exception
*/
protected void handleSubscriptionException(CompletableFuture<Void> future, BackOffExecution backOffExecution,
Throwable ex) {
Throwable cause) {
getRequiredSubscriber().closeConnection();
if (ex instanceof RedisConnectionFailureException && isRunning()) {
if (cause instanceof RedisConnectionFailureException && isRunning()) {
BackOffExecution loggingBackOffExecution = () -> {
long recoveryInterval = backOffExecution.nextBackOff();
if (recoveryInterval != BackOffExecution.STOP) {
logger.error(String.format("Connection failure occurred: %s; Restarting subscription task after %s ms", ex,
recoveryInterval), ex);
String message = String.format("Connection failure occurred: %s; Restarting subscription task after %s ms",
cause, recoveryInterval);
logger.error(message, cause);
}
return recoveryInterval;
@@ -904,16 +897,16 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
return;
}
logger.error("SubscriptionTask aborted with exception:", ex);
future.completeExceptionally(new IllegalStateException("Subscription attempts exceeded", ex));
logger.error("SubscriptionTask aborted with exception:", cause);
future.completeExceptionally(new IllegalStateException("Subscription attempts exceeded", cause));
return;
}
if (isRunning()) { // log only if the container is still running to prevent close errors from logging
logger.error("SubscriptionTask aborted with exception:", ex);
logger.error("SubscriptionTask aborted with exception:", cause);
}
future.completeExceptionally(ex);
future.completeExceptionally(cause);
}
/**
@@ -928,6 +921,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
}
try {
if (subscriptionExecutor instanceof ScheduledExecutorService) {
((ScheduledExecutorService) subscriptionExecutor).schedule(retryRunnable, recoveryInterval,
TimeUnit.MILLISECONDS);
@@ -937,8 +931,9 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
}
return true;
} catch (InterruptedException interEx) {
logger.debug("Thread interrupted while sleeping the recovery interval");
} catch (InterruptedException ignore) {
logDebug(() -> "Thread interrupted while sleeping the recovery interval");
Thread.currentThread().interrupt();
return false;
}
@@ -1014,6 +1009,13 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
}
}
private void logDebug(Supplier<String> message, Throwable cause) {
if (this.logger.isDebugEnabled()) {
this.logger.debug(message.get(), cause);
}
}
private void logTrace(Supplier<String> message) {
if (this.logger.isTraceEnabled()) {
@@ -1154,11 +1156,14 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
*/
class Subscriber {
private final DispatchMessageListener delegateListener = new DispatchMessageListener();
private final Lock lock = new ReentrantLock();
private volatile @Nullable RedisConnection connection;
private final RedisConnectionFactory connectionFactory;
private final Lock lock = new ReentrantLock();
private final DispatchMessageListener delegateListener = new DispatchMessageListener();
private final SynchronizingMessageListener synchronizingMessageListener = new SynchronizingMessageListener(
delegateListener, delegateListener);
@@ -1178,18 +1183,21 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
public CompletableFuture<Void> initialize(BackOffExecution backOffExecution, Collection<byte[]> patterns,
Collection<byte[]> channels) {
lock.lock();
try {
return doInLock(() -> {
CompletableFuture<Void> initFuture = new CompletableFuture<>();
try {
RedisConnection connection = connectionFactory.getConnection();
RedisConnection connection = this.connectionFactory.getConnection();
this.connection = connection;
if (connection.isSubscribed()) {
initFuture.completeExceptionally(
new IllegalStateException("Retrieved connection is already subscribed; aborting listening"));
new IllegalStateException("Retrieved connection is already subscribed; aborting listening"));
return initFuture;
}
@@ -1198,14 +1206,12 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
} catch (Throwable t) {
handleSubscriptionException(initFuture, backOffExecution, t);
}
} catch (RuntimeException e) {
initFuture.completeExceptionally(e);
} catch (RuntimeException cause) {
initFuture.completeExceptionally(cause);
}
return initFuture;
} finally {
lock.unlock();
}
});
}
/**
@@ -1248,21 +1254,18 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
public void unsubscribeAll() {
lock.lock();
try {
doInLock(() -> {
RedisConnection connection = this.connection;
if (connection == null) {
return;
}
doUnsubscribe(connection);
} finally {
lock.unlock();
}
if (connection != null) {
doUnsubscribe(connection);
}
});
}
void doUnsubscribe(RedisConnection connection) {
closeSubscription(connection);
closeConnection();
@@ -1274,18 +1277,14 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
*/
public void cancel() {
lock.lock();
try {
doInLock(() -> {
RedisConnection connection = this.connection;
if (connection == null) {
return;
}
doCancel(connection);
} finally {
lock.unlock();
}
if (connection != null) {
doCancel(connection);
}
});
}
void doCancel(RedisConnection connection) {
@@ -1295,22 +1294,18 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
void closeSubscription(RedisConnection connection) {
if (logger.isTraceEnabled()) {
logger.trace("Cancelling Redis subscription...");
}
logTrace(() -> "Cancelling Redis subscription...");
Subscription sub = connection.getSubscription();
Subscription subscription = connection.getSubscription();
if (sub != null) {
if (subscription != null) {
if (logger.isTraceEnabled()) {
logger.trace("Unsubscribing from all channels");
}
logTrace(() -> "Unsubscribing from all channels");
try {
sub.close();
} catch (Exception e) {
logger.warn("Unable to unsubscribe from subscriptions", e);
subscription.close();
} catch (Exception cause) {
logger.warn("Unable to unsubscribe from subscriptions", cause);
}
}
}
@@ -1320,23 +1315,21 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
*/
public void closeConnection() {
lock.lock();
try {
doInLock(() -> {
RedisConnection connection = this.connection;
this.connection = null;
if (connection != null) {
logger.trace("Closing connection");
logTrace(() -> "Closing connection");
try {
connection.close();
} catch (Exception e) {
logger.warn("Error closing subscription connection", e);
} catch (Exception cause) {
logger.warn("Error closing subscription connection", cause);
}
}
} finally {
lock.unlock();
}
});
}
/**
@@ -1381,20 +1374,31 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
return;
}
lock.lock();
try {
doInLock(() -> {
RedisConnection connection = this.connection;
if (connection != null) {
Subscription sub = connection.getSubscription();
if (sub != null) {
function.accept(sub, data);
Subscription subscription = connection.getSubscription();
if (subscription != null) {
function.accept(subscription, data);
}
}
} finally {
lock.unlock();
}
});
}
private void doInLock(Runnable runner) {
doInLock(() -> { runner.run(); return null; });
}
private <T> T doInLock(Supplier<T> supplier) {
this.lock.lock();
try {
return supplier.get();
} finally {
this.lock.unlock();
}
}
}
/**
@@ -1443,7 +1447,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels,
() -> subscriptionDone.complete(null)));
executor.execute(() -> {
this.executor.execute(() -> {
try {
doSubscribe(connection, patterns, initiallySubscribeToChannels);