Polishing.
Reorder factory fields according to immutable configuration, mutable configuration and connection factory state. Reorder methods and switch Jedis references to its current GitHub repository. Refine assertions. Update documentation. See: #2503 Original pull request: #2627
This commit is contained in:
@@ -30,8 +30,8 @@ public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
|
||||
* Returns a suitable {@link RedisConnection connection} for interacting with Redis.
|
||||
*
|
||||
* @return {@link RedisConnection connection} for interacting with Redis.
|
||||
* @throws IllegalStateException if the connection factory requires initialization and the factory has not yet
|
||||
* been initialized.
|
||||
* @throws IllegalStateException if the connection factory requires initialization and the factory has not yet been
|
||||
* initialized.
|
||||
*/
|
||||
RedisConnection getConnection();
|
||||
|
||||
@@ -39,8 +39,8 @@ public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
|
||||
* Returns a suitable {@link RedisClusterConnection connection} for interacting with Redis Cluster.
|
||||
*
|
||||
* @return a {@link RedisClusterConnection connection} for interacting with Redis Cluster.
|
||||
* @throws IllegalStateException if the connection factory requires initialization and the factory has not yet
|
||||
* been initialized.
|
||||
* @throws IllegalStateException if the connection factory requires initialization and the factory has not yet been
|
||||
* initialized.
|
||||
* @since 1.7
|
||||
*/
|
||||
RedisClusterConnection getClusterConnection();
|
||||
@@ -48,12 +48,12 @@ public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
|
||||
/**
|
||||
* Specifies if pipelined results should be converted to the expected data type.
|
||||
* <p>
|
||||
* If {@literal false}, results of {@link RedisConnection#closePipeline()} and {@link RedisConnection#exec()}
|
||||
* will be of the type returned by the underlying driver. This method is mostly for backwards compatibility
|
||||
* with {@literal 1.0}. It is generally always a good idea to allow results to be converted and deserialized.
|
||||
* In fact, this is now the default behavior.
|
||||
* If {@literal false}, results of {@link RedisConnection#closePipeline()} and {@link RedisConnection#exec()} will be
|
||||
* of the type returned by the underlying driver. This method is mostly for backwards compatibility with
|
||||
* {@literal 1.0}. It is generally always a good idea to allow results to be converted and deserialized. In fact, this
|
||||
* is now the default behavior.
|
||||
*
|
||||
* @return a boolen indicating whether to convert pipeline and transaction results.
|
||||
* @return {@code true} to convert pipeline and transaction results; {@code false} otherwise.
|
||||
*/
|
||||
boolean getConvertPipelineAndTxResults();
|
||||
|
||||
@@ -61,8 +61,8 @@ public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
|
||||
* Returns a suitable {@link RedisSentinelConnection connection} for interacting with Redis Sentinel.
|
||||
*
|
||||
* @return a {@link RedisSentinelConnection connection} for interacting with Redis Sentinel.
|
||||
* @throws IllegalStateException if the connection factory requires initialization and the factory has not yet
|
||||
* been initialized.
|
||||
* @throws IllegalStateException if the connection factory requires initialization and the factory has not yet been
|
||||
* initialized.
|
||||
* @since 1.4
|
||||
*/
|
||||
RedisSentinelConnection getSentinelConnection();
|
||||
|
||||
@@ -64,7 +64,7 @@ import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
/**
|
||||
* Connection factory creating <a href="https://github.com/xetorthio/jedis">Jedis</a> based connections.
|
||||
* Connection factory creating <a href="https://github.com/redis/jedis">Jedis</a> based connections.
|
||||
* <p>
|
||||
* {@link JedisConnectionFactory} should be configured using an environmental configuration and the
|
||||
* {@link JedisClientConfiguration client configuration}. Jedis supports the following environmental configurations:
|
||||
@@ -74,8 +74,9 @@ import org.springframework.util.ObjectUtils;
|
||||
* <li>{@link RedisClusterConfiguration}</li>
|
||||
* </ul>
|
||||
* <p>
|
||||
* This connection factory must be {@link #afterPropertiesSet() initialized} prior to {@link #getConnection obtaining
|
||||
* connections}.
|
||||
* This connection factory must be {@link #afterPropertiesSet() initialized} and {@link SmartLifecycle#start() started}
|
||||
* prior to {@link #getConnection obtaining connections}. You can {@link SmartLifecycle#stop()} and
|
||||
* {@link SmartLifecycle#start() restart} this connection factory if needed.
|
||||
*
|
||||
* @author Costin Leau
|
||||
* @author Thomas Darimont
|
||||
@@ -86,30 +87,36 @@ import org.springframework.util.ObjectUtils;
|
||||
* @see JedisClientConfiguration
|
||||
* @see Jedis
|
||||
*/
|
||||
public class JedisConnectionFactory implements RedisConnectionFactory, InitializingBean, DisposableBean, SmartLifecycle {
|
||||
public class JedisConnectionFactory
|
||||
implements RedisConnectionFactory, InitializingBean, DisposableBean, SmartLifecycle {
|
||||
|
||||
private final static Log log = LogFactory.getLog(JedisConnectionFactory.class);
|
||||
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy(
|
||||
JedisExceptionConverter.INSTANCE);
|
||||
|
||||
private final JedisClientConfiguration clientConfiguration;
|
||||
private JedisClientConfig clientConfig = DefaultJedisClientConfig.builder().build();
|
||||
private @Nullable Pool<Jedis> pool;
|
||||
|
||||
private boolean convertPipelineAndTxResults = true;
|
||||
private RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration("localhost",
|
||||
Protocol.DEFAULT_PORT);
|
||||
|
||||
private @Nullable RedisConfiguration configuration;
|
||||
|
||||
private @Nullable JedisCluster cluster;
|
||||
private @Nullable ClusterTopologyProvider topologyProvider;
|
||||
private @Nullable ClusterCommandExecutor clusterCommandExecutor;
|
||||
|
||||
/**
|
||||
* Lifecycle state of this factory.
|
||||
*/
|
||||
enum State {
|
||||
CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED;
|
||||
}
|
||||
|
||||
private AtomicReference<State> state = new AtomicReference<>(State.CREATED);
|
||||
private final AtomicReference<State> state = new AtomicReference<>(State.CREATED);
|
||||
|
||||
private JedisClientConfig clientConfig = DefaultJedisClientConfig.builder().build();
|
||||
|
||||
private @Nullable Pool<Jedis> pool;
|
||||
private @Nullable JedisCluster cluster;
|
||||
private @Nullable ClusterTopologyProvider topologyProvider;
|
||||
private @Nullable ClusterCommandExecutor clusterCommandExecutor;
|
||||
|
||||
/**
|
||||
* Constructs a new {@link JedisConnectionFactory} instance with default settings (default connection pooling).
|
||||
@@ -253,115 +260,6 @@ public class JedisConnectionFactory implements RedisConnectionFactory, Initializ
|
||||
this.configuration = clusterConfig;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a Jedis instance to be used as a Redis connection. The instance can be newly created or retrieved from a
|
||||
* pool.
|
||||
*
|
||||
* @return Jedis instance ready for wrapping into a {@link RedisConnection}.
|
||||
*/
|
||||
protected Jedis fetchJedisConnector() {
|
||||
try {
|
||||
|
||||
if (getUsePool() && pool != null) {
|
||||
return pool.getResource();
|
||||
}
|
||||
|
||||
Jedis jedis = createJedis();
|
||||
// force initialization (see Jedis issue #82)
|
||||
jedis.connect();
|
||||
|
||||
return jedis;
|
||||
} catch (Exception ex) {
|
||||
throw new RedisConnectionFailureException("Cannot get Jedis connection", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private Jedis createJedis() {
|
||||
return new Jedis(new HostAndPort(getHostName(), getPort()), this.clientConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* Post process a newly retrieved connection. Useful for decorating or executing initialization commands on a new
|
||||
* connection. This implementation simply returns the connection.
|
||||
*
|
||||
* @param connection the jedis connection.
|
||||
* @return processed connection
|
||||
*/
|
||||
protected JedisConnection postProcessConnection(JedisConnection connection) {
|
||||
return connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
State current = state.getAndUpdate(state -> {
|
||||
if (State.CREATED.equals(state) || State.STOPPED.equals(state)) {
|
||||
return State.STARTING;
|
||||
}
|
||||
return state;
|
||||
});
|
||||
|
||||
if (State.CREATED.equals(current) || State.STOPPED.equals(current)) {
|
||||
|
||||
if (getUsePool() && !isRedisClusterAware()) {
|
||||
this.pool = createPool();
|
||||
}
|
||||
|
||||
if (isRedisClusterAware()) {
|
||||
|
||||
this.cluster = createCluster();
|
||||
this.topologyProvider = createTopologyProvider(this.cluster);
|
||||
this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider,
|
||||
new JedisClusterConnection.JedisClusterNodeResourceProvider(this.cluster, this.topologyProvider),
|
||||
EXCEPTION_TRANSLATION);
|
||||
}
|
||||
|
||||
state.set(State.STARTED);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
|
||||
if (getUsePool() && !isRedisClusterAware()) {
|
||||
if (pool != null) {
|
||||
try {
|
||||
this.pool.close();
|
||||
} catch (Exception ex) {
|
||||
log.warn("Cannot properly close Jedis pool", ex);
|
||||
}
|
||||
this.pool = null;
|
||||
}
|
||||
}
|
||||
|
||||
if(this.clusterCommandExecutor != null) {
|
||||
try {
|
||||
this.clusterCommandExecutor.destroy();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.cluster != null) {
|
||||
|
||||
this.topologyProvider = null;
|
||||
|
||||
try {
|
||||
cluster.close();
|
||||
} catch (Exception ex) {
|
||||
log.warn("Cannot properly close Jedis cluster", ex);
|
||||
}
|
||||
}
|
||||
state.set(State.STOPPED);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return State.STARTED.equals(state.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() {
|
||||
clientConfig = createClientConfig(getDatabase(), getRedisUsername(), getRedisPassword());
|
||||
@@ -399,6 +297,76 @@ public class JedisConnectionFactory implements RedisConnectionFactory, Initializ
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
State current = state
|
||||
.getAndUpdate(state -> State.CREATED.equals(state) || State.STOPPED.equals(state) ? State.STARTING : state);
|
||||
|
||||
if (State.CREATED.equals(current) || State.STOPPED.equals(current)) {
|
||||
|
||||
if (getUsePool() && !isRedisClusterAware()) {
|
||||
this.pool = createPool();
|
||||
}
|
||||
|
||||
if (isRedisClusterAware()) {
|
||||
|
||||
this.cluster = createCluster();
|
||||
this.topologyProvider = createTopologyProvider(this.cluster);
|
||||
this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider,
|
||||
new JedisClusterConnection.JedisClusterNodeResourceProvider(this.cluster, this.topologyProvider),
|
||||
EXCEPTION_TRANSLATION);
|
||||
}
|
||||
|
||||
state.set(State.STARTED);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
|
||||
|
||||
if (getUsePool() && !isRedisClusterAware()) {
|
||||
if (pool != null) {
|
||||
try {
|
||||
this.pool.close();
|
||||
this.pool = null;
|
||||
} catch (Exception ex) {
|
||||
log.warn("Cannot properly close Jedis pool", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (this.clusterCommandExecutor != null) {
|
||||
try {
|
||||
this.clusterCommandExecutor.destroy();
|
||||
this.clusterCommandExecutor = null;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.cluster != null) {
|
||||
|
||||
this.topologyProvider = null;
|
||||
|
||||
try {
|
||||
this.cluster.close();
|
||||
this.cluster = null;
|
||||
} catch (Exception ex) {
|
||||
log.warn("Cannot properly close Jedis cluster", ex);
|
||||
}
|
||||
}
|
||||
state.set(State.STOPPED);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return State.STARTED.equals(state.get());
|
||||
}
|
||||
|
||||
private Pool<Jedis> createPool() {
|
||||
|
||||
if (isRedisSentinelAware()) {
|
||||
@@ -473,12 +441,14 @@ public class JedisConnectionFactory implements RedisConnectionFactory, Initializ
|
||||
return new JedisCluster(hostAndPort, this.clientConfig, redirects, poolConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
|
||||
stop();
|
||||
state.set(State.DESTROYED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedisConnection getConnection() {
|
||||
|
||||
assertInitialized();
|
||||
@@ -498,9 +468,48 @@ public class JedisConnectionFactory implements RedisConnectionFactory, Initializ
|
||||
JedisConnection connection = (getUsePool() ? new JedisConnection(jedis, pool, this.clientConfig, sentinelConfig)
|
||||
: new JedisConnection(jedis, null, this.clientConfig, sentinelConfig));
|
||||
connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
|
||||
|
||||
return postProcessConnection(connection);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a Jedis instance to be used as a Redis connection. The instance can be newly created or retrieved from a
|
||||
* pool.
|
||||
*
|
||||
* @return Jedis instance ready for wrapping into a {@link RedisConnection}.
|
||||
*/
|
||||
protected Jedis fetchJedisConnector() {
|
||||
try {
|
||||
|
||||
if (getUsePool() && pool != null) {
|
||||
return pool.getResource();
|
||||
}
|
||||
|
||||
Jedis jedis = createJedis();
|
||||
// force initialization (see Jedis issue #82)
|
||||
jedis.connect();
|
||||
|
||||
return jedis;
|
||||
} catch (Exception ex) {
|
||||
throw new RedisConnectionFailureException("Cannot get Jedis connection", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private Jedis createJedis() {
|
||||
return new Jedis(new HostAndPort(getHostName(), getPort()), this.clientConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* Post process a newly retrieved connection. Useful for decorating or executing initialization commands on a new
|
||||
* connection. This implementation simply returns the connection.
|
||||
*
|
||||
* @param connection the jedis connection.
|
||||
* @return processed connection
|
||||
*/
|
||||
protected JedisConnection postProcessConnection(JedisConnection connection) {
|
||||
return connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedisClusterConnection getClusterConnection() {
|
||||
|
||||
@@ -509,9 +518,24 @@ public class JedisConnectionFactory implements RedisConnectionFactory, Initializ
|
||||
if (!isRedisClusterAware()) {
|
||||
throw new InvalidDataAccessApiUsageException("Cluster is not configured");
|
||||
}
|
||||
return new JedisClusterConnection(this.cluster, this.clusterCommandExecutor, this.topologyProvider);
|
||||
|
||||
return postProcessConnection(
|
||||
new JedisClusterConnection(this.cluster, this.clusterCommandExecutor, this.topologyProvider));
|
||||
}
|
||||
|
||||
/**
|
||||
* Post process a newly retrieved connection. Useful for decorating or executing initialization commands on a new
|
||||
* connection. This implementation simply returns the connection.
|
||||
*
|
||||
* @param connection the jedis connection.
|
||||
* @return processed connection.
|
||||
* @since 3.2
|
||||
*/
|
||||
protected JedisClusterConnection postProcessConnection(JedisClusterConnection connection) {
|
||||
return connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
|
||||
return EXCEPTION_TRANSLATION.translate(ex);
|
||||
}
|
||||
@@ -790,22 +814,24 @@ public class JedisConnectionFactory implements RedisConnectionFactory, Initializ
|
||||
}
|
||||
|
||||
/**
|
||||
* Specifies if pipelined results should be converted to the expected data type. If false, results of
|
||||
* Specifies if pipelined results should be converted to the expected data type. If {@code false}, results of
|
||||
* {@link JedisConnection#closePipeline()} and {@link JedisConnection#exec()} will be of the type returned by the
|
||||
* Jedis driver.
|
||||
*
|
||||
* @return Whether or not to convert pipeline and tx results.
|
||||
* @return {@code true} to convert pipeline and transaction results; {@code false} otherwise.
|
||||
*/
|
||||
@Override
|
||||
public boolean getConvertPipelineAndTxResults() {
|
||||
return convertPipelineAndTxResults;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specifies if pipelined results should be converted to the expected data type. If false, results of
|
||||
* Specifies if pipelined results should be converted to the expected data type. If {@code false}, results of
|
||||
* {@link JedisConnection#closePipeline()} and {@link JedisConnection#exec()} will be of the type returned by the
|
||||
* Jedis driver.
|
||||
*
|
||||
* @param convertPipelineAndTxResults Whether or not to convert pipeline and tx results.
|
||||
* @param convertPipelineAndTxResults {@code true} to convert pipeline and transaction results; {@code false}
|
||||
* otherwise.
|
||||
*/
|
||||
public void setConvertPipelineAndTxResults(boolean convertPipelineAndTxResults) {
|
||||
this.convertPipelineAndTxResults = convertPipelineAndTxResults;
|
||||
@@ -910,7 +936,8 @@ public class JedisConnectionFactory implements RedisConnectionFactory, Initializ
|
||||
}
|
||||
|
||||
switch (current) {
|
||||
case CREATED, STOPPED -> throw new IllegalStateException(String.format("JedisConnectionFactory has been %s. Use start() to initialize it", current));
|
||||
case CREATED, STOPPED -> throw new IllegalStateException(
|
||||
String.format("JedisConnectionFactory has been %s. Use start() to initialize it", current));
|
||||
case DESTROYED -> throw new IllegalStateException(
|
||||
"JedisConnectionFactory was destroyed and cannot be used anymore");
|
||||
default -> throw new IllegalStateException(String.format("JedisConnectionFactory is %s", current));
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Connection package for <a href="https://github.com/xetorthio/jedis">Jedis</a> library.
|
||||
* Connection package for <a href="https://github.com/redis/jedis">Jedis</a> library.
|
||||
*/
|
||||
@org.springframework.lang.NonNullApi
|
||||
@org.springframework.lang.NonNullFields
|
||||
|
||||
@@ -15,53 +15,7 @@
|
||||
*/
|
||||
package org.springframework.data.redis.connection.lettuce;
|
||||
|
||||
import static org.springframework.data.redis.connection.lettuce.LettuceConnection.CODEC;
|
||||
import static org.springframework.data.redis.connection.lettuce.LettuceConnection.PipeliningFlushPolicy;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.dao.DataAccessException;
|
||||
import org.springframework.dao.InvalidDataAccessApiUsageException;
|
||||
import org.springframework.data.redis.ExceptionTranslationStrategy;
|
||||
import org.springframework.data.redis.PassThroughExceptionTranslationStrategy;
|
||||
import org.springframework.data.redis.RedisConnectionFailureException;
|
||||
import org.springframework.data.redis.connection.ClusterCommandExecutor;
|
||||
import org.springframework.data.redis.connection.ClusterTopologyProvider;
|
||||
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
|
||||
import org.springframework.data.redis.connection.RedisClusterConfiguration;
|
||||
import org.springframework.data.redis.connection.RedisClusterConnection;
|
||||
import org.springframework.data.redis.connection.RedisConfiguration;
|
||||
import org.springframework.data.redis.connection.RedisConfiguration.ClusterConfiguration;
|
||||
import org.springframework.data.redis.connection.RedisConfiguration.DomainSocketConfiguration;
|
||||
import org.springframework.data.redis.connection.RedisConfiguration.WithDatabaseIndex;
|
||||
import org.springframework.data.redis.connection.RedisConfiguration.WithPassword;
|
||||
import org.springframework.data.redis.connection.RedisConnection;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.connection.RedisPassword;
|
||||
import org.springframework.data.redis.connection.RedisSentinelConfiguration;
|
||||
import org.springframework.data.redis.connection.RedisSentinelConnection;
|
||||
import org.springframework.data.redis.connection.RedisSocketConfiguration;
|
||||
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
|
||||
import org.springframework.data.redis.connection.RedisStaticMasterReplicaConfiguration;
|
||||
import org.springframework.data.util.Optionals;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ClassUtils;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
import static org.springframework.data.redis.connection.lettuce.LettuceConnection.*;
|
||||
|
||||
import io.lettuce.core.AbstractRedisClient;
|
||||
import io.lettuce.core.ClientOptions;
|
||||
@@ -78,8 +32,39 @@ import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
|
||||
import io.lettuce.core.codec.RedisCodec;
|
||||
import io.lettuce.core.resource.ClientResources;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.dao.DataAccessException;
|
||||
import org.springframework.dao.InvalidDataAccessApiUsageException;
|
||||
import org.springframework.data.redis.ExceptionTranslationStrategy;
|
||||
import org.springframework.data.redis.PassThroughExceptionTranslationStrategy;
|
||||
import org.springframework.data.redis.RedisConnectionFailureException;
|
||||
import org.springframework.data.redis.connection.*;
|
||||
import org.springframework.data.redis.connection.RedisConfiguration.ClusterConfiguration;
|
||||
import org.springframework.data.redis.connection.RedisConfiguration.DomainSocketConfiguration;
|
||||
import org.springframework.data.redis.connection.RedisConfiguration.WithDatabaseIndex;
|
||||
import org.springframework.data.redis.connection.RedisConfiguration.WithPassword;
|
||||
import org.springframework.data.util.Optionals;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ClassUtils;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Connection factory creating <a href="https://github.com/mp911de/lettuce">Lettuce</a>-based connections.
|
||||
@@ -104,8 +89,9 @@ import org.apache.commons.logging.LogFactory;
|
||||
* <li>{@link RedisClusterConfiguration}</li>
|
||||
* </ul>
|
||||
* <p>
|
||||
* This connection factory must be {@link #afterPropertiesSet() initialized} prior to {@link #getConnection obtaining
|
||||
* connections}.
|
||||
* This connection factory must be {@link #afterPropertiesSet() initialized} and {@link SmartLifecycle#start() started}
|
||||
* prior to {@link #getConnection obtaining connections}. You can {@link SmartLifecycle#stop()} and
|
||||
* {@link SmartLifecycle#start() restart} this connection factory if needed.
|
||||
*
|
||||
* @author Costin Leau
|
||||
* @author Jennifer Hickey
|
||||
@@ -125,32 +111,38 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
LettuceExceptionConverter.INSTANCE);
|
||||
|
||||
private final Log log = LogFactory.getLog(getClass());
|
||||
private final LettuceClientConfiguration clientConfiguration;
|
||||
|
||||
private @Nullable AbstractRedisClient client;
|
||||
private @Nullable LettuceConnectionProvider connectionProvider;
|
||||
private @Nullable LettuceConnectionProvider reactiveConnectionProvider;
|
||||
private boolean validateConnection = false;
|
||||
private boolean shareNativeConnection = true;
|
||||
private boolean eagerInitialization = false;
|
||||
private @Nullable SharedConnection<byte[]> connection;
|
||||
private @Nullable SharedConnection<ByteBuffer> reactiveConnection;
|
||||
/** Synchronization monitor for the shared Connection */
|
||||
private final Object connectionMonitor = new Object();
|
||||
private boolean convertPipelineAndTxResults = true;
|
||||
|
||||
private final LettuceClientConfiguration clientConfiguration;
|
||||
|
||||
private RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration("localhost", 6379);
|
||||
private PipeliningFlushPolicy pipeliningFlushPolicy = PipeliningFlushPolicy.flushEachCommand();
|
||||
|
||||
private @Nullable RedisConfiguration configuration;
|
||||
|
||||
private @Nullable ClusterCommandExecutor clusterCommandExecutor;
|
||||
private boolean validateConnection = false;
|
||||
private boolean shareNativeConnection = true;
|
||||
private boolean eagerInitialization = false;
|
||||
private boolean convertPipelineAndTxResults = true;
|
||||
|
||||
private PipeliningFlushPolicy pipeliningFlushPolicy = PipeliningFlushPolicy.flushEachCommand();
|
||||
|
||||
/**
|
||||
* Lifecycle state of this factory.
|
||||
*/
|
||||
enum State {
|
||||
CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED;
|
||||
}
|
||||
|
||||
private AtomicReference<State> state = new AtomicReference<>(State.CREATED);
|
||||
private final AtomicReference<State> state = new AtomicReference<>(State.CREATED);
|
||||
|
||||
private @Nullable AbstractRedisClient client;
|
||||
private @Nullable LettuceConnectionProvider connectionProvider;
|
||||
private @Nullable LettuceConnectionProvider reactiveConnectionProvider;
|
||||
private @Nullable SharedConnection<byte[]> connection;
|
||||
private @Nullable SharedConnection<ByteBuffer> reactiveConnection;
|
||||
private @Nullable ClusterCommandExecutor clusterCommandExecutor;
|
||||
|
||||
/**
|
||||
* Constructs a new {@link LettuceConnectionFactory} instance with default settings.
|
||||
@@ -341,12 +333,8 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
State current = state.getAndUpdate(state -> {
|
||||
if (State.CREATED.equals(state) || State.STOPPED.equals(state)) {
|
||||
return State.STARTING;
|
||||
}
|
||||
return state;
|
||||
});
|
||||
State current = state
|
||||
.getAndUpdate(state -> State.CREATED.equals(state) || State.STOPPED.equals(state) ? State.STARTING : state);
|
||||
|
||||
if (State.CREATED.equals(current) || State.STOPPED.equals(current)) {
|
||||
|
||||
@@ -376,23 +364,33 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
public void stop() {
|
||||
|
||||
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
|
||||
resetConnection();
|
||||
dispose(connectionProvider);
|
||||
dispose(reactiveConnectionProvider);
|
||||
try {
|
||||
Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
|
||||
Duration timeout = clientConfiguration.getShutdownTimeout();
|
||||
client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
|
||||
state.set(State.STOPPED);
|
||||
} catch (Exception e) {
|
||||
|
||||
if (log.isWarnEnabled()) {
|
||||
log.warn((client != null ? ClassUtils.getShortName(client.getClass()) : "LettuceClient")
|
||||
+ " did not shut down gracefully.", e);
|
||||
resetConnection();
|
||||
|
||||
dispose(connectionProvider);
|
||||
connectionProvider = null;
|
||||
|
||||
dispose(reactiveConnectionProvider);
|
||||
reactiveConnectionProvider = null;
|
||||
|
||||
if (client != null) {
|
||||
|
||||
try {
|
||||
Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
|
||||
Duration timeout = clientConfiguration.getShutdownTimeout();
|
||||
|
||||
client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
|
||||
client = null;
|
||||
} catch (Exception e) {
|
||||
|
||||
if (log.isWarnEnabled()) {
|
||||
log.warn(ClassUtils.getShortName(client.getClass()) + " did not shut down gracefully.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
state.set(State.STOPPED);
|
||||
}
|
||||
|
||||
state.set(State.STOPPED);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -421,7 +419,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
state.set(State.DESTROYED);
|
||||
}
|
||||
|
||||
private void dispose(LettuceConnectionProvider connectionProvider) {
|
||||
private void dispose(@Nullable LettuceConnectionProvider connectionProvider) {
|
||||
|
||||
if (connectionProvider instanceof DisposableBean) {
|
||||
try {
|
||||
@@ -435,9 +433,10 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedisConnection getConnection() {
|
||||
|
||||
assertInitialized();
|
||||
assertStarted();
|
||||
|
||||
if (isClusterAware()) {
|
||||
return getClusterConnection();
|
||||
@@ -452,7 +451,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
@Override
|
||||
public RedisClusterConnection getClusterConnection() {
|
||||
|
||||
assertInitialized();
|
||||
assertStarted();
|
||||
|
||||
if (!isClusterAware()) {
|
||||
throw new InvalidDataAccessApiUsageException("Cluster is not configured");
|
||||
@@ -517,7 +516,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
@Override
|
||||
public LettuceReactiveRedisConnection getReactiveConnection() {
|
||||
|
||||
assertInitialized();
|
||||
assertStarted();
|
||||
|
||||
if (isClusterAware()) {
|
||||
return getReactiveClusterConnection();
|
||||
@@ -531,7 +530,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
@Override
|
||||
public LettuceReactiveRedisClusterConnection getReactiveClusterConnection() {
|
||||
|
||||
assertInitialized();
|
||||
assertStarted();
|
||||
|
||||
if (!isClusterAware()) {
|
||||
throw new InvalidDataAccessApiUsageException("Cluster is not configured");
|
||||
@@ -581,7 +580,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
*/
|
||||
public void validateConnection() {
|
||||
|
||||
assertInitialized();
|
||||
assertStarted();
|
||||
|
||||
getOrCreateSharedConnection().validateConnection();
|
||||
getOrCreateSharedReactiveConnection().validateConnection();
|
||||
@@ -611,6 +610,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
|
||||
return EXCEPTION_TRANSLATION.translate(ex);
|
||||
}
|
||||
@@ -888,7 +888,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
*/
|
||||
@Nullable
|
||||
public AbstractRedisClient getNativeClient() {
|
||||
assertInitialized();
|
||||
assertStarted();
|
||||
return this.client;
|
||||
}
|
||||
|
||||
@@ -1042,22 +1042,24 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
}
|
||||
|
||||
/**
|
||||
* Specifies if pipelined results should be converted to the expected data type. If false, results of
|
||||
* Specifies if pipelined results should be converted to the expected data type. If {@code false}, results of
|
||||
* {@link LettuceConnection#closePipeline()} and {LettuceConnection#exec()} will be of the type returned by the
|
||||
* Lettuce driver.
|
||||
*
|
||||
* @return Whether or not to convert pipeline and tx results.
|
||||
* @return {@code true} to convert pipeline and transaction results; {@code false} otherwise.
|
||||
*/
|
||||
@Override
|
||||
public boolean getConvertPipelineAndTxResults() {
|
||||
return convertPipelineAndTxResults;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specifies if pipelined and transaction results should be converted to the expected data type. If false, results of
|
||||
* {@link LettuceConnection#closePipeline()} and {LettuceConnection#exec()} will be of the type returned by the
|
||||
* Lettuce driver.
|
||||
* Specifies if pipelined and transaction results should be converted to the expected data type. If {@code false},
|
||||
* results of {@link LettuceConnection#closePipeline()} and {LettuceConnection#exec()} will be of the type returned by
|
||||
* the Lettuce driver.
|
||||
*
|
||||
* @param convertPipelineAndTxResults Whether or not to convert pipeline and tx results.
|
||||
* @param convertPipelineAndTxResults {@code true} to convert pipeline and transaction results; {@code false}
|
||||
* otherwise.
|
||||
*/
|
||||
public void setConvertPipelineAndTxResults(boolean convertPipelineAndTxResults) {
|
||||
this.convertPipelineAndTxResults = convertPipelineAndTxResults;
|
||||
@@ -1156,8 +1158,8 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
protected LettuceConnectionProvider doCreateConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {
|
||||
|
||||
return isStaticMasterReplicaAware() ? createStaticMasterReplicaConnectionProvider((RedisClient) client, codec)
|
||||
: isClusterAware() ? createClusterConnectionProvider((RedisClusterClient) client, codec)
|
||||
: createStandaloneConnectionProvider((RedisClient) client, codec);
|
||||
: isClusterAware() ? createClusterConnectionProvider((RedisClusterClient) client, codec)
|
||||
: createStandaloneConnectionProvider((RedisClient) client, codec);
|
||||
}
|
||||
|
||||
@SuppressWarnings("all")
|
||||
@@ -1165,9 +1167,8 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
RedisCodec<?, ?> codec) {
|
||||
|
||||
List<RedisURI> nodes = ((RedisStaticMasterReplicaConfiguration) this.configuration).getNodes().stream()
|
||||
.map(it -> createRedisURIAndApplySettings(it.getHostName(), it.getPort()))
|
||||
.peek(it -> it.setDatabase(getDatabase()))
|
||||
.collect(Collectors.toList());
|
||||
.map(it -> createRedisURIAndApplySettings(it.getHostName(), it.getPort()))
|
||||
.peek(it -> it.setDatabase(getDatabase())).collect(Collectors.toList());
|
||||
|
||||
return new StaticMasterReplicaConnectionProvider(client, codec, nodes,
|
||||
getClientConfiguration().getReadFrom().orElse(null));
|
||||
@@ -1184,15 +1185,13 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
protected AbstractRedisClient createClient() {
|
||||
|
||||
return isStaticMasterReplicaAware() ? createStaticMasterReplicaClient()
|
||||
: isRedisSentinelAware() ? createSentinelClient()
|
||||
: isClusterAware() ? createClusterClient()
|
||||
: createBasicClient();
|
||||
: isRedisSentinelAware() ? createSentinelClient()
|
||||
: isClusterAware() ? createClusterClient() : createBasicClient();
|
||||
}
|
||||
|
||||
private RedisClient createStaticMasterReplicaClient() {
|
||||
|
||||
RedisClient redisClient = this.clientConfiguration.getClientResources()
|
||||
.map(RedisClient::create)
|
||||
RedisClient redisClient = this.clientConfiguration.getClientResources().map(RedisClient::create)
|
||||
.orElseGet(RedisClient::create);
|
||||
|
||||
this.clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);
|
||||
@@ -1251,8 +1250,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
|
||||
ClusterConfiguration configuration = (ClusterConfiguration) this.configuration;
|
||||
|
||||
configuration.getClusterNodes().stream()
|
||||
.map(node -> createRedisURIAndApplySettings(node.getHost(), node.getPort()))
|
||||
configuration.getClusterNodes().stream().map(node -> createRedisURIAndApplySettings(node.getHost(), node.getPort()))
|
||||
.forEach(initialUris::add);
|
||||
|
||||
RedisClusterClient clusterClient = this.clientConfiguration.getClientResources()
|
||||
@@ -1268,12 +1266,9 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
|
||||
Optional<ClientOptions> clientOptions = this.clientConfiguration.getClientOptions();
|
||||
|
||||
ClusterClientOptions clusterClientOptions = clientOptions
|
||||
.filter(ClusterClientOptions.class::isInstance)
|
||||
.map(ClusterClientOptions.class::cast)
|
||||
.orElseGet(() -> clientOptions
|
||||
.map(it -> ClusterClientOptions.builder(it).build())
|
||||
.orElseGet(ClusterClientOptions::create));
|
||||
ClusterClientOptions clusterClientOptions = clientOptions.filter(ClusterClientOptions.class::isInstance)
|
||||
.map(ClusterClientOptions.class::cast).orElseGet(() -> clientOptions
|
||||
.map(it -> ClusterClientOptions.builder(it).build()).orElseGet(ClusterClientOptions::create));
|
||||
|
||||
if (configuration.getMaxRedirects() != null) {
|
||||
return clusterClientOptions.mutate().maxRedirects(configuration.getMaxRedirects()).build();
|
||||
@@ -1290,15 +1285,14 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
: createRedisURIAndApplySettings(getHostName(), getPort());
|
||||
|
||||
RedisClient redisClient = this.clientConfiguration.getClientResources()
|
||||
.map(clientResources -> RedisClient.create(clientResources, uri))
|
||||
.orElseGet(() -> RedisClient.create(uri));
|
||||
.map(clientResources -> RedisClient.create(clientResources, uri)).orElseGet(() -> RedisClient.create(uri));
|
||||
|
||||
this.clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);
|
||||
|
||||
return redisClient;
|
||||
}
|
||||
|
||||
private void assertInitialized() {
|
||||
private void assertStarted() {
|
||||
|
||||
State current = state.get();
|
||||
|
||||
@@ -1307,7 +1301,8 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
}
|
||||
|
||||
switch (current) {
|
||||
case CREATED, STOPPED -> throw new IllegalStateException(String.format("LettuceConnectionFactory has been %s. Use start() to initialize it", current));
|
||||
case CREATED, STOPPED -> throw new IllegalStateException(
|
||||
String.format("LettuceConnectionFactory has been %s. Use start() to initialize it", current));
|
||||
case DESTROYED -> throw new IllegalStateException(
|
||||
"LettuceConnectionFactory was destroyed and cannot be used anymore");
|
||||
default -> throw new IllegalStateException(String.format("LettuceConnectionFactory is %s", current));
|
||||
@@ -1359,14 +1354,14 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
getRedisPassword().toOptional().ifPresent(builder::withPassword);
|
||||
}
|
||||
|
||||
clientConfiguration.getRedisCredentialsProviderFactory().ifPresent(factory ->
|
||||
builder.withAuthentication(factory.createCredentialsProvider(this.configuration)));
|
||||
clientConfiguration.getRedisCredentialsProviderFactory()
|
||||
.ifPresent(factory -> builder.withAuthentication(factory.createCredentialsProvider(this.configuration)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedisSentinelConnection getSentinelConnection() {
|
||||
|
||||
assertInitialized();
|
||||
assertStarted();
|
||||
|
||||
return new LettuceSentinelConnection(connectionProvider);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user