Polishing.
Move executor from ClusterConfiguration to connection factories as the executor is a Spring concept that isn't tied to endpoint details or the client config. Reorder static factory methods after constructors and property accessors after static factory methods. Inline single-line single-use methods that aren't intended as extension hooks for easier readability. Disable TaskExecutor disposal on ClusterCommandExecutor.destroy(). Remove NonNull annotations as default non-nullability is defined on the package level. Simplify tests to use integration tests to avoid excessive mocking. See #2594 Original pull request: #2669
This commit is contained in:
@@ -32,16 +32,14 @@ import org.springframework.data.redis.ExceptionTranslationStrategy;
|
||||
import org.springframework.data.redis.TooManyClusterRedirectionsException;
|
||||
import org.springframework.data.redis.connection.util.ByteArraySet;
|
||||
import org.springframework.data.redis.connection.util.ByteArrayWrapper;
|
||||
import org.springframework.lang.NonNull;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
/**
|
||||
* {@link ClusterCommandExecutor} takes care of running commands across the known cluster nodes. By providing an
|
||||
* {@link AsyncTaskExecutor} the execution behavior can be influenced.
|
||||
* {@link AsyncTaskExecutor} the execution behavior can be configured.
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @author Mark Paluch
|
||||
@@ -49,8 +47,6 @@ import org.springframework.util.ObjectUtils;
|
||||
*/
|
||||
public class ClusterCommandExecutor implements DisposableBean {
|
||||
|
||||
protected static final AsyncTaskExecutor DEFAULT_TASK_EXECUTOR = new SimpleAsyncTaskExecutor();
|
||||
|
||||
private int maxRedirects = 5;
|
||||
|
||||
private final AsyncTaskExecutor executor;
|
||||
@@ -71,14 +67,14 @@ public class ClusterCommandExecutor implements DisposableBean {
|
||||
public ClusterCommandExecutor(ClusterTopologyProvider topologyProvider, ClusterNodeResourceProvider resourceProvider,
|
||||
ExceptionTranslationStrategy exceptionTranslation) {
|
||||
|
||||
this(topologyProvider, resourceProvider, exceptionTranslation, DEFAULT_TASK_EXECUTOR);
|
||||
this(topologyProvider, resourceProvider, exceptionTranslation, new SimpleAsyncTaskExecutor());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param topologyProvider must not be {@literal null}.
|
||||
* @param resourceProvider must not be {@literal null}.
|
||||
* @param exceptionTranslation must not be {@literal null}.
|
||||
* @param executor can be {@literal null}. Defaulted to {@link ThreadPoolTaskExecutor}.
|
||||
* @param executor the task executor to null, defaults to {@link SimpleAsyncTaskExecutor} if {@literal null}.
|
||||
*/
|
||||
public ClusterCommandExecutor(ClusterTopologyProvider topologyProvider, ClusterNodeResourceProvider resourceProvider,
|
||||
ExceptionTranslationStrategy exceptionTranslation, @Nullable AsyncTaskExecutor executor) {
|
||||
@@ -90,11 +86,7 @@ public class ClusterCommandExecutor implements DisposableBean {
|
||||
this.topologyProvider = topologyProvider;
|
||||
this.resourceProvider = resourceProvider;
|
||||
this.exceptionTranslationStrategy = exceptionTranslation;
|
||||
this.executor = resolveTaskExecutor(executor);
|
||||
}
|
||||
|
||||
private @NonNull AsyncTaskExecutor resolveTaskExecutor(@Nullable AsyncTaskExecutor taskExecutor) {
|
||||
return taskExecutor != null ? taskExecutor : DEFAULT_TASK_EXECUTOR;
|
||||
this.executor = executor != null ? executor : new SimpleAsyncTaskExecutor();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -149,9 +141,8 @@ public class ClusterCommandExecutor implements DisposableBean {
|
||||
RuntimeException translatedException = convertToDataAccessException(cause);
|
||||
|
||||
if (translatedException instanceof ClusterRedirectException clusterRedirectException) {
|
||||
return executeCommandOnSingleNode(cmd, topologyProvider.getTopology()
|
||||
.lookup(clusterRedirectException.getTargetHost(), clusterRedirectException.getTargetPort()),
|
||||
redirectCount + 1);
|
||||
return executeCommandOnSingleNode(cmd, topologyProvider.getTopology().lookup(
|
||||
clusterRedirectException.getTargetHost(), clusterRedirectException.getTargetPort()), redirectCount + 1);
|
||||
} else {
|
||||
throw translatedException != null ? translatedException : cause;
|
||||
}
|
||||
@@ -182,7 +173,7 @@ public class ClusterCommandExecutor implements DisposableBean {
|
||||
* @param cmd must not be {@literal null}.
|
||||
* @return never {@literal null}.
|
||||
* @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
|
||||
* {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
|
||||
* {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
|
||||
*/
|
||||
public <S, T> MultiNodeResult<T> executeCommandOnAllNodes(final ClusterCommandCallback<S, T> cmd) {
|
||||
return executeCommandAsyncOnNodes(cmd, getClusterTopology().getActiveMasterNodes());
|
||||
@@ -193,7 +184,7 @@ public class ClusterCommandExecutor implements DisposableBean {
|
||||
* @param nodes must not be {@literal null}.
|
||||
* @return never {@literal null}.
|
||||
* @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
|
||||
* {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
|
||||
* {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
|
||||
* @throws IllegalArgumentException in case the node could not be resolved to a topology-known node
|
||||
*/
|
||||
public <S, T> MultiNodeResult<T> executeCommandAsyncOnNodes(ClusterCommandCallback<S, T> callback,
|
||||
@@ -295,7 +286,7 @@ public class ClusterCommandExecutor implements DisposableBean {
|
||||
* @param commandCallback must not be {@literal null}.
|
||||
* @return never {@literal null}.
|
||||
* @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
|
||||
* {@link MultiKeyClusterCommandCallback command}.
|
||||
* {@link MultiKeyClusterCommandCallback command}.
|
||||
*/
|
||||
public <S, T> MultiNodeResult<T> executeMultiKeyCommand(MultiKeyClusterCommandCallback<S, T> commandCallback,
|
||||
Iterable<byte[]> keys) {
|
||||
@@ -315,8 +306,8 @@ public class ClusterCommandExecutor implements DisposableBean {
|
||||
|
||||
if (entry.getKey().isMaster()) {
|
||||
for (PositionalKey key : entry.getValue()) {
|
||||
futures.put(new NodeExecution(entry.getKey(), key), this.executor.submit(() ->
|
||||
executeMultiKeyCommandOnSingleNode(commandCallback, entry.getKey(), key.getBytes())));
|
||||
futures.put(new NodeExecution(entry.getKey(), key), this.executor
|
||||
.submit(() -> executeMultiKeyCommandOnSingleNode(commandCallback, entry.getKey(), key.getBytes())));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -367,10 +358,6 @@ public class ClusterCommandExecutor implements DisposableBean {
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
|
||||
if (this.executor instanceof DisposableBean disposableBean) {
|
||||
disposableBean.destroy();
|
||||
}
|
||||
|
||||
if (this.resourceProvider instanceof DisposableBean disposableBean) {
|
||||
disposableBean.destroy();
|
||||
}
|
||||
|
||||
@@ -15,8 +15,6 @@
|
||||
*/
|
||||
package org.springframework.data.redis.connection;
|
||||
|
||||
import static org.springframework.util.StringUtils.commaDelimitedListToSet;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
@@ -26,7 +24,6 @@ import java.util.Set;
|
||||
|
||||
import org.springframework.core.env.MapPropertySource;
|
||||
import org.springframework.core.env.PropertySource;
|
||||
import org.springframework.core.task.AsyncTaskExecutor;
|
||||
import org.springframework.data.redis.connection.RedisConfiguration.ClusterConfiguration;
|
||||
import org.springframework.data.redis.util.RedisAssertions;
|
||||
import org.springframework.lang.Nullable;
|
||||
@@ -36,8 +33,8 @@ import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Configuration class used to set up a {@link RedisConnection} via {@link RedisConnectionFactory} for connecting
|
||||
* to <a href="https://redis.io/topics/cluster-spec">Redis Cluster</a>. Useful when setting up a highly available Redis
|
||||
* Configuration class used to set up a {@link RedisConnection} via {@link RedisConnectionFactory} for connecting to
|
||||
* <a href="https://redis.io/topics/cluster-spec">Redis Cluster</a>. Useful when setting up a highly available Redis
|
||||
* environment.
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
@@ -52,8 +49,6 @@ public class RedisClusterConfiguration implements RedisConfiguration, ClusterCon
|
||||
|
||||
private @Nullable Integer maxRedirects;
|
||||
|
||||
private @Nullable AsyncTaskExecutor executor;
|
||||
|
||||
private RedisPassword password = RedisPassword.none();
|
||||
|
||||
private final Set<RedisNode> clusterNodes;
|
||||
@@ -103,10 +98,12 @@ public class RedisClusterConfiguration implements RedisConfiguration, ClusterCon
|
||||
this.clusterNodes = new LinkedHashSet<>();
|
||||
|
||||
if (propertySource.containsProperty(REDIS_CLUSTER_NODES_CONFIG_PROPERTY)) {
|
||||
|
||||
Object redisClusterNodes = propertySource.getProperty(REDIS_CLUSTER_NODES_CONFIG_PROPERTY);
|
||||
appendClusterNodes(commaDelimitedListToSet(String.valueOf(redisClusterNodes)));
|
||||
appendClusterNodes(StringUtils.commaDelimitedListToSet(String.valueOf(redisClusterNodes)));
|
||||
}
|
||||
if (propertySource.containsProperty(REDIS_CLUSTER_MAX_REDIRECTS_CONFIG_PROPERTY)) {
|
||||
|
||||
Object clusterMaxRedirects = propertySource.getProperty(REDIS_CLUSTER_MAX_REDIRECTS_CONFIG_PROPERTY);
|
||||
this.maxRedirects = NumberUtils.parseNumber(String.valueOf(clusterMaxRedirects), Integer.class);
|
||||
}
|
||||
@@ -204,16 +201,6 @@ public class RedisClusterConfiguration implements RedisConfiguration, ClusterCon
|
||||
return password;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAsyncTaskExecutor(@Nullable AsyncTaskExecutor executor) {
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
@Nullable @Override
|
||||
public AsyncTaskExecutor getAsyncTaskExecutor() {
|
||||
return this.executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(@Nullable Object obj) {
|
||||
|
||||
@@ -226,9 +213,9 @@ public class RedisClusterConfiguration implements RedisConfiguration, ClusterCon
|
||||
}
|
||||
|
||||
return ObjectUtils.nullSafeEquals(this.clusterNodes, that.clusterNodes)
|
||||
&& ObjectUtils.nullSafeEquals(this.maxRedirects, that.maxRedirects)
|
||||
&& ObjectUtils.nullSafeEquals(this.username, that.username)
|
||||
&& ObjectUtils.nullSafeEquals(this.password, that.password);
|
||||
&& ObjectUtils.nullSafeEquals(this.maxRedirects, that.maxRedirects)
|
||||
&& ObjectUtils.nullSafeEquals(this.username, that.username)
|
||||
&& ObjectUtils.nullSafeEquals(this.password, that.password);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -21,7 +21,6 @@ import java.util.Set;
|
||||
import java.util.function.IntSupplier;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.springframework.core.task.AsyncTaskExecutor;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
@@ -346,20 +345,6 @@ public interface RedisConfiguration {
|
||||
*/
|
||||
interface ClusterConfiguration extends WithPassword {
|
||||
|
||||
/**
|
||||
* Configures the {@link AsyncTaskExecutor} used to execute commands asynchronously across the cluster.
|
||||
*
|
||||
* @param executor {@link AsyncTaskExecutor} used to execute commands asynchronously across the cluster.
|
||||
*/
|
||||
void setAsyncTaskExecutor(AsyncTaskExecutor executor);
|
||||
|
||||
/**
|
||||
* Returns the configured {@link AsyncTaskExecutor} used to execute commands asynchronously across the cluster.
|
||||
*
|
||||
* @return the configured {@link AsyncTaskExecutor} used to execute commands asynchronously across the cluster.
|
||||
*/
|
||||
AsyncTaskExecutor getAsyncTaskExecutor();
|
||||
|
||||
/**
|
||||
* Returns an {@link Collections#unmodifiableSet(Set) Set} of {@link RedisNode cluster nodes}.
|
||||
*
|
||||
|
||||
@@ -100,8 +100,8 @@ public class JedisConnectionFactory
|
||||
|
||||
private static final Log log = LogFactory.getLog(JedisConnectionFactory.class);
|
||||
|
||||
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION =
|
||||
new PassThroughExceptionTranslationStrategy(JedisExceptionConverter.INSTANCE);
|
||||
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy(
|
||||
JedisExceptionConverter.INSTANCE);
|
||||
|
||||
private boolean convertPipelineAndTxResults = true;
|
||||
|
||||
@@ -111,6 +111,8 @@ public class JedisConnectionFactory
|
||||
|
||||
private @Nullable ClusterCommandExecutor clusterCommandExecutor;
|
||||
|
||||
private @Nullable AsyncTaskExecutor executor;
|
||||
|
||||
private @Nullable ClusterTopologyProvider topologyProvider;
|
||||
|
||||
private JedisClientConfig clientConfig = DefaultJedisClientConfig.builder().build();
|
||||
@@ -123,8 +125,8 @@ public class JedisConnectionFactory
|
||||
|
||||
private @Nullable RedisConfiguration configuration;
|
||||
|
||||
private RedisStandaloneConfiguration standaloneConfig =
|
||||
new RedisStandaloneConfiguration("localhost", Protocol.DEFAULT_PORT);
|
||||
private RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration("localhost",
|
||||
Protocol.DEFAULT_PORT);
|
||||
|
||||
/**
|
||||
* Lifecycle state of this factory.
|
||||
@@ -248,7 +250,7 @@ public class JedisConnectionFactory
|
||||
|
||||
this.configuration = sentinelConfiguration;
|
||||
this.clientConfiguration = MutableJedisClientConfiguration
|
||||
.create(poolConfig != null ? poolConfig : new JedisPoolConfig());
|
||||
.create(poolConfig != null ? poolConfig : new JedisPoolConfig());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -279,339 +281,26 @@ public class JedisConnectionFactory
|
||||
this.standaloneConfig = standaloneConfiguration;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
protected ClusterCommandExecutor getClusterCommandExecutor() {
|
||||
ClusterCommandExecutor getRequiredClusterCommandExecutor() {
|
||||
|
||||
if (this.clusterCommandExecutor == null) {
|
||||
throw new IllegalStateException("ClusterCommandExecutor not initialized");
|
||||
}
|
||||
|
||||
return this.clusterCommandExecutor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() {
|
||||
|
||||
this.clientConfig = createClientConfig(getDatabase(), getRedisUsername(), getRedisPassword());
|
||||
|
||||
if (isAutoStartup()) {
|
||||
start();
|
||||
}
|
||||
}
|
||||
|
||||
private JedisClientConfig createClientConfig(int database, @Nullable String username, RedisPassword password) {
|
||||
|
||||
DefaultJedisClientConfig.Builder builder = DefaultJedisClientConfig.builder();
|
||||
|
||||
this.clientConfiguration.getClientName().ifPresent(builder::clientName);
|
||||
builder.connectionTimeoutMillis(getConnectTimeout());
|
||||
builder.socketTimeoutMillis(getReadTimeout());
|
||||
|
||||
builder.database(database);
|
||||
|
||||
if (!ObjectUtils.isEmpty(username)) {
|
||||
builder.user(username);
|
||||
}
|
||||
password.toOptional().map(String::new).ifPresent(builder::password);
|
||||
|
||||
if (isUseSsl()) {
|
||||
|
||||
builder.ssl(true);
|
||||
|
||||
this.clientConfiguration.getSslSocketFactory().ifPresent(builder::sslSocketFactory);
|
||||
this.clientConfiguration.getHostnameVerifier().ifPresent(builder::hostnameVerifier);
|
||||
this.clientConfiguration.getSslParameters().ifPresent(builder::sslParameters);
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
JedisClientConfig createSentinelClientConfig(SentinelConfiguration sentinelConfiguration) {
|
||||
return createClientConfig(0, sentinelConfiguration.getSentinelUsername(),
|
||||
sentinelConfiguration.getSentinelPassword());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
State current = this.state.getAndUpdate(state -> isCreatedOrStopped(state) ? State.STARTING : state);
|
||||
|
||||
if (isCreatedOrStopped(current)) {
|
||||
|
||||
if (getUsePool() && !isRedisClusterAware()) {
|
||||
this.pool = createPool();
|
||||
}
|
||||
|
||||
if (isRedisClusterAware()) {
|
||||
this.cluster = createCluster();
|
||||
this.topologyProvider = createTopologyProvider(this.cluster);
|
||||
this.clusterCommandExecutor = newClusterCommandExecutor();
|
||||
}
|
||||
|
||||
this.state.set(State.STARTED);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isCreatedOrStopped(@Nullable State state) {
|
||||
return State.CREATED.equals(state) || State.STOPPED.equals(state);
|
||||
}
|
||||
|
||||
private ClusterCommandExecutor newClusterCommandExecutor() {
|
||||
|
||||
return new ClusterCommandExecutor(this.topologyProvider, newClusterNodeResourceProvider(),
|
||||
EXCEPTION_TRANSLATION, resolveTaskExecutor(this.configuration));
|
||||
}
|
||||
|
||||
private ClusterNodeResourceProvider newClusterNodeResourceProvider() {
|
||||
return new JedisClusterConnection.JedisClusterNodeResourceProvider(this.cluster, this.topologyProvider);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private AsyncTaskExecutor resolveTaskExecutor(@Nullable RedisConfiguration redisConfiguration) {
|
||||
|
||||
return redisConfiguration instanceof RedisConfiguration.ClusterConfiguration clusterConfiguration
|
||||
? clusterConfiguration.getAsyncTaskExecutor()
|
||||
: null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
if (this.state.compareAndSet(State.STARTED, State.STOPPING)) {
|
||||
|
||||
if (getUsePool() && !isRedisClusterAware()) {
|
||||
if (this.pool != null) {
|
||||
try {
|
||||
this.pool.close();
|
||||
this.pool = null;
|
||||
} catch (Exception ex) {
|
||||
log.warn("Cannot properly close Jedis pool", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ClusterCommandExecutor clusterCommandExecutor = this.clusterCommandExecutor;
|
||||
|
||||
if (clusterCommandExecutor != null) {
|
||||
try {
|
||||
clusterCommandExecutor.destroy();
|
||||
this.clusterCommandExecutor = null;
|
||||
} catch (Exception cause) {
|
||||
throw new RuntimeException(cause);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.cluster != null) {
|
||||
|
||||
this.topologyProvider = null;
|
||||
|
||||
try {
|
||||
this.cluster.close();
|
||||
this.cluster = null;
|
||||
} catch (Exception cause) {
|
||||
log.warn("Cannot properly close Jedis cluster", cause);
|
||||
}
|
||||
}
|
||||
|
||||
this.state.set(State.STOPPED);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPhase() {
|
||||
return this.phase;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specify the lifecycle phase for pausing and resuming this executor. The default is {@code 0}.
|
||||
* Configures the {@link AsyncTaskExecutor executor} used to execute commands asynchronously across the cluster.
|
||||
*
|
||||
* @since 3.2
|
||||
* @see SmartLifecycle#getPhase()
|
||||
*/
|
||||
public void setPhase(int phase) {
|
||||
this.phase = phase;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return State.STARTED.equals(this.state.get());
|
||||
}
|
||||
|
||||
private Pool<Jedis> createPool() {
|
||||
|
||||
if (isRedisSentinelAware()) {
|
||||
return createRedisSentinelPool((RedisSentinelConfiguration) this.configuration);
|
||||
}
|
||||
return createRedisPool();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates {@link JedisSentinelPool}.
|
||||
*
|
||||
* @param config the actual {@link RedisSentinelConfiguration}. Never {@literal null}.
|
||||
* @return the {@link Pool} to use. Never {@literal null}.
|
||||
* @since 1.4
|
||||
*/
|
||||
protected Pool<Jedis> createRedisSentinelPool(RedisSentinelConfiguration config) {
|
||||
|
||||
GenericObjectPoolConfig<Jedis> poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig();
|
||||
|
||||
JedisClientConfig sentinelConfig = createSentinelClientConfig(config);
|
||||
|
||||
return new JedisSentinelPool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()),
|
||||
poolConfig, this.clientConfig, sentinelConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates {@link JedisPool}.
|
||||
*
|
||||
* @return the {@link Pool} to use. Never {@literal null}.
|
||||
* @since 1.4
|
||||
*/
|
||||
protected Pool<Jedis> createRedisPool() {
|
||||
return new JedisPool(getPoolConfig(), new HostAndPort(getHostName(), getPort()), this.clientConfig);
|
||||
}
|
||||
|
||||
JedisCluster createCluster() {
|
||||
return createCluster((RedisClusterConfiguration) this.configuration, getPoolConfig());
|
||||
}
|
||||
|
||||
/**
|
||||
* Template method to create a {@link ClusterTopologyProvider} given {@link JedisCluster}. Creates
|
||||
* {@link JedisClusterTopologyProvider} by default.
|
||||
*
|
||||
* @param cluster the {@link JedisCluster}, must not be {@literal null}.
|
||||
* @return the {@link ClusterTopologyProvider}.
|
||||
* @see JedisClusterTopologyProvider
|
||||
* @see 2.2
|
||||
*/
|
||||
protected ClusterTopologyProvider createTopologyProvider(JedisCluster cluster) {
|
||||
return new JedisClusterTopologyProvider(cluster);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates {@link JedisCluster} for given {@link RedisClusterConfiguration} and {@link GenericObjectPoolConfig}.
|
||||
*
|
||||
* @param clusterConfig must not be {@literal null}.
|
||||
* @param poolConfig can be {@literal null}.
|
||||
* @return the actual {@link JedisCluster}.
|
||||
* @since 1.7
|
||||
*/
|
||||
protected JedisCluster createCluster(RedisClusterConfiguration clusterConfig,
|
||||
GenericObjectPoolConfig<Connection> poolConfig) {
|
||||
|
||||
Assert.notNull(clusterConfig, "Cluster configuration must not be null");
|
||||
|
||||
Set<HostAndPort> hostAndPort = new HashSet<>();
|
||||
|
||||
for (RedisNode node : clusterConfig.getClusterNodes()) {
|
||||
hostAndPort.add(new HostAndPort(node.getHost(), node.getPort()));
|
||||
}
|
||||
|
||||
int redirects = clusterConfig.getMaxRedirects() != null ? clusterConfig.getMaxRedirects() : 5;
|
||||
|
||||
return new JedisCluster(hostAndPort, this.clientConfig, redirects, poolConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
|
||||
stop();
|
||||
state.set(State.DESTROYED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedisConnection getConnection() {
|
||||
|
||||
assertInitialized();
|
||||
|
||||
if (isRedisClusterAware()) {
|
||||
return getClusterConnection();
|
||||
}
|
||||
|
||||
Jedis jedis = fetchJedisConnector();
|
||||
JedisClientConfig sentinelConfig = this.clientConfig;
|
||||
|
||||
SentinelConfiguration sentinelConfiguration = getSentinelConfiguration();
|
||||
|
||||
if (sentinelConfiguration != null) {
|
||||
sentinelConfig = createSentinelClientConfig(sentinelConfiguration);
|
||||
}
|
||||
|
||||
JedisConnection connection = getUsePool()
|
||||
? new JedisConnection(jedis, this.pool, this.clientConfig, sentinelConfig)
|
||||
: new JedisConnection(jedis, null, this.clientConfig, sentinelConfig);
|
||||
|
||||
connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
|
||||
|
||||
return postProcessConnection(connection);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a Jedis instance to be used as a Redis connection. The instance can be newly created or retrieved from a
|
||||
* pool.
|
||||
*
|
||||
* @return Jedis instance ready for wrapping into a {@link RedisConnection}.
|
||||
*/
|
||||
protected Jedis fetchJedisConnector() {
|
||||
|
||||
try {
|
||||
|
||||
if (getUsePool() && this.pool != null) {
|
||||
return this.pool.getResource();
|
||||
}
|
||||
|
||||
Jedis jedis = createJedis();
|
||||
|
||||
// force initialization (see Jedis issue #82)
|
||||
jedis.connect();
|
||||
|
||||
return jedis;
|
||||
} catch (Exception cause) {
|
||||
throw new RedisConnectionFailureException("Cannot get Jedis connection", cause);
|
||||
}
|
||||
}
|
||||
|
||||
private Jedis createJedis() {
|
||||
return new Jedis(new HostAndPort(getHostName(), getPort()), this.clientConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* Post process a newly retrieved connection. Useful for decorating or executing initialization commands on a new
|
||||
* connection. This implementation simply returns the connection.
|
||||
*
|
||||
* @param connection the jedis connection.
|
||||
* @return processed connection
|
||||
*/
|
||||
protected JedisConnection postProcessConnection(JedisConnection connection) {
|
||||
return connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedisClusterConnection getClusterConnection() {
|
||||
|
||||
assertInitialized();
|
||||
|
||||
if (!isRedisClusterAware()) {
|
||||
throw new InvalidDataAccessApiUsageException("Cluster is not configured");
|
||||
}
|
||||
|
||||
JedisClusterConnection clusterConnection =
|
||||
new JedisClusterConnection(this.cluster, getClusterCommandExecutor(), this.topologyProvider);
|
||||
|
||||
return postProcessConnection(clusterConnection);
|
||||
}
|
||||
|
||||
/**
|
||||
* Post process a newly retrieved connection. Useful for decorating or executing initialization commands on a new
|
||||
* connection. This implementation simply returns the connection.
|
||||
*
|
||||
* @param connection the jedis connection.
|
||||
* @return processed connection.
|
||||
* @param executor {@link AsyncTaskExecutor executor} used to execute commands asynchronously across the cluster.
|
||||
* @since 3.2
|
||||
*/
|
||||
protected JedisClusterConnection postProcessConnection(JedisClusterConnection connection) {
|
||||
return connection;
|
||||
}
|
||||
public void setExecutor(AsyncTaskExecutor executor) {
|
||||
|
||||
@Override
|
||||
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
|
||||
return EXCEPTION_TRANSLATION.translate(ex);
|
||||
Assert.notNull(executor, "AsyncTaskExecutor must not be null");
|
||||
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -922,6 +611,316 @@ public class JedisConnectionFactory
|
||||
return RedisConfiguration.isClusterConfiguration(configuration);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() {
|
||||
|
||||
this.clientConfig = createClientConfig(getDatabase(), getRedisUsername(), getRedisPassword());
|
||||
|
||||
if (isAutoStartup()) {
|
||||
start();
|
||||
}
|
||||
}
|
||||
|
||||
private JedisClientConfig createClientConfig(int database, @Nullable String username, RedisPassword password) {
|
||||
|
||||
DefaultJedisClientConfig.Builder builder = DefaultJedisClientConfig.builder();
|
||||
|
||||
this.clientConfiguration.getClientName().ifPresent(builder::clientName);
|
||||
builder.connectionTimeoutMillis(getConnectTimeout());
|
||||
builder.socketTimeoutMillis(getReadTimeout());
|
||||
|
||||
builder.database(database);
|
||||
|
||||
if (!ObjectUtils.isEmpty(username)) {
|
||||
builder.user(username);
|
||||
}
|
||||
password.toOptional().map(String::new).ifPresent(builder::password);
|
||||
|
||||
if (isUseSsl()) {
|
||||
|
||||
builder.ssl(true);
|
||||
|
||||
this.clientConfiguration.getSslSocketFactory().ifPresent(builder::sslSocketFactory);
|
||||
this.clientConfiguration.getHostnameVerifier().ifPresent(builder::hostnameVerifier);
|
||||
this.clientConfiguration.getSslParameters().ifPresent(builder::sslParameters);
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
JedisClientConfig createSentinelClientConfig(SentinelConfiguration sentinelConfiguration) {
|
||||
return createClientConfig(0, sentinelConfiguration.getSentinelUsername(),
|
||||
sentinelConfiguration.getSentinelPassword());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
State current = this.state.getAndUpdate(state -> isCreatedOrStopped(state) ? State.STARTING : state);
|
||||
|
||||
if (isCreatedOrStopped(current)) {
|
||||
|
||||
if (getUsePool() && !isRedisClusterAware()) {
|
||||
this.pool = createPool();
|
||||
}
|
||||
|
||||
if (isRedisClusterAware()) {
|
||||
|
||||
this.cluster = createCluster(getClusterConfiguration(), getPoolConfig());
|
||||
this.topologyProvider = createTopologyProvider(this.cluster);
|
||||
this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider,
|
||||
new JedisClusterConnection.JedisClusterNodeResourceProvider(this.cluster, this.topologyProvider),
|
||||
EXCEPTION_TRANSLATION, executor);
|
||||
}
|
||||
|
||||
this.state.set(State.STARTED);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isCreatedOrStopped(@Nullable State state) {
|
||||
return State.CREATED.equals(state) || State.STOPPED.equals(state);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
if (this.state.compareAndSet(State.STARTED, State.STOPPING)) {
|
||||
|
||||
if (getUsePool() && !isRedisClusterAware()) {
|
||||
if (this.pool != null) {
|
||||
try {
|
||||
this.pool.close();
|
||||
this.pool = null;
|
||||
} catch (Exception ex) {
|
||||
log.warn("Cannot properly close Jedis pool", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ClusterCommandExecutor clusterCommandExecutor = this.clusterCommandExecutor;
|
||||
|
||||
if (clusterCommandExecutor != null) {
|
||||
try {
|
||||
clusterCommandExecutor.destroy();
|
||||
this.clusterCommandExecutor = null;
|
||||
} catch (Exception cause) {
|
||||
throw new RuntimeException(cause);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.cluster != null) {
|
||||
|
||||
this.topologyProvider = null;
|
||||
|
||||
try {
|
||||
this.cluster.close();
|
||||
this.cluster = null;
|
||||
} catch (Exception cause) {
|
||||
log.warn("Cannot properly close Jedis cluster", cause);
|
||||
}
|
||||
}
|
||||
|
||||
this.state.set(State.STOPPED);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPhase() {
|
||||
return this.phase;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specify the lifecycle phase for pausing and resuming this executor. The default is {@code 0}.
|
||||
*
|
||||
* @since 3.2
|
||||
* @see SmartLifecycle#getPhase()
|
||||
*/
|
||||
public void setPhase(int phase) {
|
||||
this.phase = phase;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return State.STARTED.equals(this.state.get());
|
||||
}
|
||||
|
||||
private Pool<Jedis> createPool() {
|
||||
|
||||
if (isRedisSentinelAware()) {
|
||||
return createRedisSentinelPool(getSentinelConfiguration());
|
||||
}
|
||||
return createRedisPool();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates {@link JedisSentinelPool}.
|
||||
*
|
||||
* @param config the actual {@link RedisSentinelConfiguration}. Never {@literal null}.
|
||||
* @return the {@link Pool} to use. Never {@literal null}.
|
||||
* @since 1.4
|
||||
*/
|
||||
protected Pool<Jedis> createRedisSentinelPool(RedisSentinelConfiguration config) {
|
||||
|
||||
GenericObjectPoolConfig<Jedis> poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig();
|
||||
|
||||
JedisClientConfig sentinelConfig = createSentinelClientConfig(config);
|
||||
|
||||
return new JedisSentinelPool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()),
|
||||
poolConfig, this.clientConfig, sentinelConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates {@link JedisPool}.
|
||||
*
|
||||
* @return the {@link Pool} to use. Never {@literal null}.
|
||||
* @since 1.4
|
||||
*/
|
||||
protected Pool<Jedis> createRedisPool() {
|
||||
return new JedisPool(getPoolConfig(), new HostAndPort(getHostName(), getPort()), this.clientConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* Template method to create a {@link ClusterTopologyProvider} given {@link JedisCluster}. Creates
|
||||
* {@link JedisClusterTopologyProvider} by default.
|
||||
*
|
||||
* @param cluster the {@link JedisCluster}, must not be {@literal null}.
|
||||
* @return the {@link ClusterTopologyProvider}.
|
||||
* @see JedisClusterTopologyProvider
|
||||
* @see 2.2
|
||||
*/
|
||||
protected ClusterTopologyProvider createTopologyProvider(JedisCluster cluster) {
|
||||
return new JedisClusterTopologyProvider(cluster);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates {@link JedisCluster} for given {@link RedisClusterConfiguration} and {@link GenericObjectPoolConfig}.
|
||||
*
|
||||
* @param clusterConfig must not be {@literal null}.
|
||||
* @param poolConfig can be {@literal null}.
|
||||
* @return the actual {@link JedisCluster}.
|
||||
* @since 1.7
|
||||
*/
|
||||
protected JedisCluster createCluster(RedisClusterConfiguration clusterConfig,
|
||||
GenericObjectPoolConfig<Connection> poolConfig) {
|
||||
|
||||
Assert.notNull(clusterConfig, "Cluster configuration must not be null");
|
||||
|
||||
Set<HostAndPort> hostAndPort = new HashSet<>();
|
||||
|
||||
for (RedisNode node : clusterConfig.getClusterNodes()) {
|
||||
hostAndPort.add(new HostAndPort(node.getHost(), node.getPort()));
|
||||
}
|
||||
|
||||
int redirects = clusterConfig.getMaxRedirects() != null ? clusterConfig.getMaxRedirects() : 5;
|
||||
|
||||
return new JedisCluster(hostAndPort, this.clientConfig, redirects, poolConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
|
||||
stop();
|
||||
state.set(State.DESTROYED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedisConnection getConnection() {
|
||||
|
||||
assertInitialized();
|
||||
|
||||
if (isRedisClusterAware()) {
|
||||
return getClusterConnection();
|
||||
}
|
||||
|
||||
Jedis jedis = fetchJedisConnector();
|
||||
JedisClientConfig sentinelConfig = this.clientConfig;
|
||||
|
||||
SentinelConfiguration sentinelConfiguration = getSentinelConfiguration();
|
||||
|
||||
if (sentinelConfiguration != null) {
|
||||
sentinelConfig = createSentinelClientConfig(sentinelConfiguration);
|
||||
}
|
||||
|
||||
JedisConnection connection = getUsePool() ? new JedisConnection(jedis, this.pool, this.clientConfig, sentinelConfig)
|
||||
: new JedisConnection(jedis, null, this.clientConfig, sentinelConfig);
|
||||
|
||||
connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
|
||||
|
||||
return postProcessConnection(connection);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a Jedis instance to be used as a Redis connection. The instance can be newly created or retrieved from a
|
||||
* pool.
|
||||
*
|
||||
* @return Jedis instance ready for wrapping into a {@link RedisConnection}.
|
||||
*/
|
||||
protected Jedis fetchJedisConnector() {
|
||||
|
||||
try {
|
||||
|
||||
if (getUsePool() && this.pool != null) {
|
||||
return this.pool.getResource();
|
||||
}
|
||||
|
||||
Jedis jedis = createJedis();
|
||||
|
||||
// force initialization (see Jedis issue #82)
|
||||
jedis.connect();
|
||||
|
||||
return jedis;
|
||||
} catch (Exception cause) {
|
||||
throw new RedisConnectionFailureException("Cannot get Jedis connection", cause);
|
||||
}
|
||||
}
|
||||
|
||||
private Jedis createJedis() {
|
||||
return new Jedis(new HostAndPort(getHostName(), getPort()), this.clientConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* Post process a newly retrieved connection. Useful for decorating or executing initialization commands on a new
|
||||
* connection. This implementation simply returns the connection.
|
||||
*
|
||||
* @param connection the jedis connection.
|
||||
* @return processed connection
|
||||
*/
|
||||
protected JedisConnection postProcessConnection(JedisConnection connection) {
|
||||
return connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedisClusterConnection getClusterConnection() {
|
||||
|
||||
assertInitialized();
|
||||
|
||||
if (!isRedisClusterAware()) {
|
||||
throw new InvalidDataAccessApiUsageException("Cluster is not configured");
|
||||
}
|
||||
|
||||
JedisClusterConnection clusterConnection = new JedisClusterConnection(this.cluster,
|
||||
getRequiredClusterCommandExecutor(), this.topologyProvider);
|
||||
|
||||
return postProcessConnection(clusterConnection);
|
||||
}
|
||||
|
||||
/**
|
||||
* Post process a newly retrieved connection. Useful for decorating or executing initialization commands on a new
|
||||
* connection. This implementation simply returns the connection.
|
||||
*
|
||||
* @param connection the jedis connection.
|
||||
* @return processed connection.
|
||||
* @since 3.2
|
||||
*/
|
||||
protected JedisClusterConnection postProcessConnection(JedisClusterConnection connection) {
|
||||
return connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
|
||||
return EXCEPTION_TRANSLATION.translate(ex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedisSentinelConnection getSentinelConnection() {
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
*/
|
||||
package org.springframework.data.redis.connection.lettuce;
|
||||
|
||||
import static org.springframework.data.redis.connection.lettuce.LettuceConnection.PipeliningFlushPolicy;
|
||||
import static org.springframework.data.redis.connection.lettuce.LettuceConnection.*;
|
||||
|
||||
import io.lettuce.core.AbstractRedisClient;
|
||||
import io.lettuce.core.ClientOptions;
|
||||
@@ -55,23 +55,10 @@ import org.springframework.dao.InvalidDataAccessApiUsageException;
|
||||
import org.springframework.data.redis.ExceptionTranslationStrategy;
|
||||
import org.springframework.data.redis.PassThroughExceptionTranslationStrategy;
|
||||
import org.springframework.data.redis.RedisConnectionFailureException;
|
||||
import org.springframework.data.redis.connection.ClusterCommandExecutor;
|
||||
import org.springframework.data.redis.connection.ClusterTopologyProvider;
|
||||
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
|
||||
import org.springframework.data.redis.connection.RedisClusterConfiguration;
|
||||
import org.springframework.data.redis.connection.RedisClusterConnection;
|
||||
import org.springframework.data.redis.connection.RedisConfiguration;
|
||||
import org.springframework.data.redis.connection.*;
|
||||
import org.springframework.data.redis.connection.RedisConfiguration.ClusterConfiguration;
|
||||
import org.springframework.data.redis.connection.RedisConfiguration.WithDatabaseIndex;
|
||||
import org.springframework.data.redis.connection.RedisConfiguration.WithPassword;
|
||||
import org.springframework.data.redis.connection.RedisConnection;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.connection.RedisPassword;
|
||||
import org.springframework.data.redis.connection.RedisSentinelConfiguration;
|
||||
import org.springframework.data.redis.connection.RedisSentinelConnection;
|
||||
import org.springframework.data.redis.connection.RedisSocketConfiguration;
|
||||
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
|
||||
import org.springframework.data.redis.connection.RedisStaticMasterReplicaConfiguration;
|
||||
import org.springframework.data.util.Optionals;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
@@ -124,57 +111,8 @@ import org.springframework.util.StringUtils;
|
||||
public class LettuceConnectionFactory implements RedisConnectionFactory, ReactiveRedisConnectionFactory,
|
||||
InitializingBean, DisposableBean, SmartLifecycle {
|
||||
|
||||
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION =
|
||||
new PassThroughExceptionTranslationStrategy(LettuceExceptionConverter.INSTANCE);
|
||||
|
||||
/**
|
||||
* Creates a {@link RedisConfiguration} based on a {@link String URI} according to the following:
|
||||
* <ul>
|
||||
* <li>If {@code redisUri} contains sentinels, a {@link RedisSentinelConfiguration} is returned</li>
|
||||
* <li>If {@code redisUri} has a configured socket a {@link RedisSocketConfiguration} is returned</li>
|
||||
* <li>Otherwise a {@link RedisStandaloneConfiguration} is returned</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param redisUri the connection URI in the format of a {@link RedisURI}.
|
||||
* @return an appropriate {@link RedisConfiguration} instance representing the Redis URI.
|
||||
* @since 2.5.3
|
||||
* @see #createRedisConfiguration(RedisURI)
|
||||
* @see RedisURI
|
||||
*/
|
||||
public static RedisConfiguration createRedisConfiguration(String redisUri) {
|
||||
|
||||
Assert.hasText(redisUri, "RedisURI must not be null or empty");
|
||||
|
||||
return createRedisConfiguration(RedisURI.create(redisUri));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a {@link RedisConfiguration} based on a {@link RedisURI} according to the following:
|
||||
* <ul>
|
||||
* <li>If {@link RedisURI} contains sentinels, a {@link RedisSentinelConfiguration} is returned</li>
|
||||
* <li>If {@link RedisURI} has a configured socket a {@link RedisSocketConfiguration} is returned</li>
|
||||
* <li>Otherwise a {@link RedisStandaloneConfiguration} is returned</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param redisUri the connection URI.
|
||||
* @return an appropriate {@link RedisConfiguration} instance representing the Redis URI.
|
||||
* @since 2.5.3
|
||||
* @see RedisURI
|
||||
*/
|
||||
public static RedisConfiguration createRedisConfiguration(RedisURI redisUri) {
|
||||
|
||||
Assert.notNull(redisUri, "RedisURI must not be null");
|
||||
|
||||
if (!ObjectUtils.isEmpty(redisUri.getSentinels())) {
|
||||
return LettuceConverters.createRedisSentinelConfiguration(redisUri);
|
||||
}
|
||||
|
||||
if (!ObjectUtils.isEmpty(redisUri.getSocket())) {
|
||||
return LettuceConverters.createRedisSocketConfiguration(redisUri);
|
||||
}
|
||||
|
||||
return LettuceConverters.createRedisStandaloneConfiguration(redisUri);
|
||||
}
|
||||
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy(
|
||||
LettuceExceptionConverter.INSTANCE);
|
||||
|
||||
private boolean validateConnection = false;
|
||||
private boolean shareNativeConnection = true;
|
||||
@@ -189,6 +127,8 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
|
||||
private @Nullable ClusterCommandExecutor clusterCommandExecutor;
|
||||
|
||||
private @Nullable AsyncTaskExecutor executor;
|
||||
|
||||
private final LettuceClientConfiguration clientConfiguration;
|
||||
|
||||
private @Nullable LettuceConnectionProvider connectionProvider;
|
||||
@@ -203,8 +143,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
|
||||
private @Nullable RedisConfiguration configuration;
|
||||
|
||||
private RedisStandaloneConfiguration standaloneConfig =
|
||||
new RedisStandaloneConfiguration("localhost", 6379);
|
||||
private RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration("localhost", 6379);
|
||||
|
||||
private @Nullable SharedConnection<byte[]> connection;
|
||||
private @Nullable SharedConnection<ByteBuffer> reactiveConnection;
|
||||
@@ -355,344 +294,75 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
this.configuration = this.standaloneConfig;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
protected ClusterCommandExecutor getClusterCommandExecutor() {
|
||||
/**
|
||||
* Creates a {@link RedisConfiguration} based on a {@link String URI} according to the following:
|
||||
* <ul>
|
||||
* <li>If {@code redisUri} contains sentinels, a {@link RedisSentinelConfiguration} is returned</li>
|
||||
* <li>If {@code redisUri} has a configured socket a {@link RedisSocketConfiguration} is returned</li>
|
||||
* <li>Otherwise a {@link RedisStandaloneConfiguration} is returned</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param redisUri the connection URI in the format of a {@link RedisURI}.
|
||||
* @return an appropriate {@link RedisConfiguration} instance representing the Redis URI.
|
||||
* @since 2.5.3
|
||||
* @see #createRedisConfiguration(RedisURI)
|
||||
* @see RedisURI
|
||||
*/
|
||||
public static RedisConfiguration createRedisConfiguration(String redisUri) {
|
||||
|
||||
Assert.hasText(redisUri, "RedisURI must not be null or empty");
|
||||
|
||||
return createRedisConfiguration(RedisURI.create(redisUri));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a {@link RedisConfiguration} based on a {@link RedisURI} according to the following:
|
||||
* <ul>
|
||||
* <li>If {@link RedisURI} contains sentinels, a {@link RedisSentinelConfiguration} is returned</li>
|
||||
* <li>If {@link RedisURI} has a configured socket a {@link RedisSocketConfiguration} is returned</li>
|
||||
* <li>Otherwise a {@link RedisStandaloneConfiguration} is returned</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param redisUri the connection URI.
|
||||
* @return an appropriate {@link RedisConfiguration} instance representing the Redis URI.
|
||||
* @since 2.5.3
|
||||
* @see RedisURI
|
||||
*/
|
||||
public static RedisConfiguration createRedisConfiguration(RedisURI redisUri) {
|
||||
|
||||
Assert.notNull(redisUri, "RedisURI must not be null");
|
||||
|
||||
if (!ObjectUtils.isEmpty(redisUri.getSentinels())) {
|
||||
return LettuceConverters.createRedisSentinelConfiguration(redisUri);
|
||||
}
|
||||
|
||||
if (!ObjectUtils.isEmpty(redisUri.getSocket())) {
|
||||
return LettuceConverters.createRedisSocketConfiguration(redisUri);
|
||||
}
|
||||
|
||||
return LettuceConverters.createRedisStandaloneConfiguration(redisUri);
|
||||
}
|
||||
|
||||
ClusterCommandExecutor getRequiredClusterCommandExecutor() {
|
||||
|
||||
if (this.clusterCommandExecutor == null) {
|
||||
throw new IllegalStateException("ClusterCommandExecutor not initialized");
|
||||
}
|
||||
|
||||
return this.clusterCommandExecutor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
State current = this.state.getAndUpdate(state -> isCreatedOrStopped(state) ? State.STARTING : state);
|
||||
|
||||
if (isCreatedOrStopped(current)) {
|
||||
|
||||
this.client = createClient();
|
||||
this.connectionProvider = newExceptionTranslatingConnectionProvider(this.client, LettuceConnection.CODEC);
|
||||
this.reactiveConnectionProvider = newExceptionTranslatingConnectionProvider(this.client,
|
||||
LettuceReactiveRedisConnection.CODEC);
|
||||
|
||||
if (isClusterAware()) {
|
||||
this.clusterCommandExecutor = newClusterCommandExecutor();
|
||||
}
|
||||
|
||||
this.state.set(State.STARTED);
|
||||
|
||||
if (getEagerInitialization() && getShareNativeConnection()) {
|
||||
initConnection();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isCreatedOrStopped(@Nullable State state) {
|
||||
return State.CREATED.equals(state) || State.STOPPED.equals(state);
|
||||
}
|
||||
|
||||
private ClusterCommandExecutor newClusterCommandExecutor() {
|
||||
|
||||
return new ClusterCommandExecutor(newClusterTopologyProvider(), newClusterNodeResourceProvider(),
|
||||
EXCEPTION_TRANSLATION, resolveTaskExecutor(this.configuration));
|
||||
}
|
||||
|
||||
private LettuceClusterConnection.LettuceClusterNodeResourceProvider newClusterNodeResourceProvider() {
|
||||
return new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider);
|
||||
}
|
||||
|
||||
private LettuceClusterTopologyProvider newClusterTopologyProvider() {
|
||||
return new LettuceClusterTopologyProvider((RedisClusterClient) this.client);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private AsyncTaskExecutor resolveTaskExecutor(RedisConfiguration redisConfiguration) {
|
||||
|
||||
return redisConfiguration instanceof ClusterConfiguration clusterConfiguration
|
||||
? clusterConfiguration.getAsyncTaskExecutor()
|
||||
: null;
|
||||
}
|
||||
|
||||
private ExceptionTranslatingConnectionProvider newExceptionTranslatingConnectionProvider(AbstractRedisClient client,
|
||||
RedisCodec<?, ?> codec) {
|
||||
|
||||
return new ExceptionTranslatingConnectionProvider(createConnectionProvider(client, codec));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
|
||||
|
||||
resetConnection();
|
||||
|
||||
dispose(connectionProvider);
|
||||
connectionProvider = null;
|
||||
|
||||
dispose(reactiveConnectionProvider);
|
||||
reactiveConnectionProvider = null;
|
||||
|
||||
if (client != null) {
|
||||
|
||||
try {
|
||||
Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
|
||||
Duration timeout = clientConfiguration.getShutdownTimeout();
|
||||
|
||||
client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
|
||||
client = null;
|
||||
} catch (Exception cause) {
|
||||
if (log.isWarnEnabled()) {
|
||||
log.warn(ClassUtils.getShortName(client.getClass()) + " did not shut down gracefully.", cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state.set(State.STOPPED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPhase() {
|
||||
return phase;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specify the lifecycle phase for pausing and resuming this executor. The default is {@code 0}.
|
||||
* Configures the {@link AsyncTaskExecutor executor} used to execute commands asynchronously across the cluster.
|
||||
*
|
||||
* @param executor {@link AsyncTaskExecutor executor} used to execute commands asynchronously across the cluster.
|
||||
* @since 3.2
|
||||
* @see SmartLifecycle#getPhase()
|
||||
*/
|
||||
public void setPhase(int phase) {
|
||||
this.phase = phase;
|
||||
}
|
||||
public void setExecutor(AsyncTaskExecutor executor) {
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return State.STARTED.equals(this.state.get());
|
||||
}
|
||||
Assert.notNull(executor, "AsyncTaskExecutor must not be null");
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() {
|
||||
if (isAutoStartup()) {
|
||||
start();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
|
||||
stop();
|
||||
this.client = null;
|
||||
|
||||
ClusterCommandExecutor clusterCommandExecutor = getClusterCommandExecutor();
|
||||
|
||||
if (clusterCommandExecutor != null) {
|
||||
try {
|
||||
clusterCommandExecutor.destroy();
|
||||
this.clusterCommandExecutor = null;
|
||||
} catch (Exception cause) {
|
||||
log.warn("Cannot properly close cluster command executor", cause);
|
||||
}
|
||||
}
|
||||
|
||||
this.state.set(State.DESTROYED);
|
||||
}
|
||||
|
||||
private void dispose(@Nullable LettuceConnectionProvider connectionProvider) {
|
||||
|
||||
if (connectionProvider instanceof DisposableBean disposableBean) {
|
||||
try {
|
||||
disposableBean.destroy();
|
||||
} catch (Exception cause) {
|
||||
if (log.isWarnEnabled()) {
|
||||
log.warn(connectionProvider + " did not shut down gracefully.", cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedisConnection getConnection() {
|
||||
|
||||
assertStarted();
|
||||
|
||||
if (isClusterAware()) {
|
||||
return getClusterConnection();
|
||||
}
|
||||
|
||||
LettuceConnection connection = doCreateLettuceConnection(getSharedConnection(), connectionProvider,
|
||||
getTimeout(), getDatabase());
|
||||
|
||||
connection.setConvertPipelineAndTxResults(this.convertPipelineAndTxResults);
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedisClusterConnection getClusterConnection() {
|
||||
|
||||
assertStarted();
|
||||
|
||||
if (!isClusterAware()) {
|
||||
throw new InvalidDataAccessApiUsageException("Cluster is not configured");
|
||||
}
|
||||
|
||||
RedisClusterClient clusterClient = (RedisClusterClient) client;
|
||||
|
||||
StatefulRedisClusterConnection<byte[], byte[]> sharedConnection = getSharedClusterConnection();
|
||||
|
||||
LettuceClusterTopologyProvider topologyProvider = new LettuceClusterTopologyProvider(clusterClient);
|
||||
|
||||
return doCreateLettuceClusterConnection(sharedConnection, this.connectionProvider, topologyProvider,
|
||||
getClusterCommandExecutor(), this.clientConfiguration.getCommandTimeout());
|
||||
}
|
||||
|
||||
/**
|
||||
* Customization hook for {@link LettuceConnection} creation.
|
||||
*
|
||||
* @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is
|
||||
* {@literal true}; {@literal null} otherwise.
|
||||
* @param connectionProvider the {@link LettuceConnectionProvider} to release connections.
|
||||
* @param timeout command timeout in {@link TimeUnit#MILLISECONDS}.
|
||||
* @param database database index to operate on.
|
||||
* @return the {@link LettuceConnection}.
|
||||
* @throws IllegalArgumentException if a required parameter is {@literal null}.
|
||||
* @since 2.2
|
||||
*/
|
||||
protected LettuceConnection doCreateLettuceConnection(
|
||||
@Nullable StatefulRedisConnection<byte[], byte[]> sharedConnection, LettuceConnectionProvider connectionProvider,
|
||||
long timeout, int database) {
|
||||
|
||||
LettuceConnection connection = new LettuceConnection(sharedConnection, connectionProvider, timeout, database);
|
||||
|
||||
connection.setPipeliningFlushPolicy(this.pipeliningFlushPolicy);
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Customization hook for {@link LettuceClusterConnection} creation.
|
||||
*
|
||||
* @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is
|
||||
* {@literal true}; {@literal null} otherwise.
|
||||
* @param connectionProvider the {@link LettuceConnectionProvider} to release connections.
|
||||
* @param topologyProvider the {@link ClusterTopologyProvider}.
|
||||
* @param clusterCommandExecutor the {@link ClusterCommandExecutor} to release connections.
|
||||
* @param commandTimeout command timeout {@link Duration}.
|
||||
* @return the {@link LettuceConnection}.
|
||||
* @throws IllegalArgumentException if a required parameter is {@literal null}.
|
||||
* @since 2.2
|
||||
*/
|
||||
protected LettuceClusterConnection doCreateLettuceClusterConnection(
|
||||
@Nullable StatefulRedisClusterConnection<byte[], byte[]> sharedConnection,
|
||||
LettuceConnectionProvider connectionProvider, ClusterTopologyProvider topologyProvider,
|
||||
ClusterCommandExecutor clusterCommandExecutor, Duration commandTimeout) {
|
||||
|
||||
LettuceClusterConnection connection = new LettuceClusterConnection(sharedConnection, connectionProvider,
|
||||
topologyProvider, clusterCommandExecutor, commandTimeout);
|
||||
|
||||
connection.setPipeliningFlushPolicy(this.pipeliningFlushPolicy);
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LettuceReactiveRedisConnection getReactiveConnection() {
|
||||
|
||||
assertStarted();
|
||||
|
||||
if (isClusterAware()) {
|
||||
return getReactiveClusterConnection();
|
||||
}
|
||||
|
||||
return getShareNativeConnection()
|
||||
? new LettuceReactiveRedisConnection(getSharedReactiveConnection(), reactiveConnectionProvider)
|
||||
: new LettuceReactiveRedisConnection(reactiveConnectionProvider);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LettuceReactiveRedisClusterConnection getReactiveClusterConnection() {
|
||||
|
||||
assertStarted();
|
||||
|
||||
if (!isClusterAware()) {
|
||||
throw new InvalidDataAccessApiUsageException("Cluster is not configured");
|
||||
}
|
||||
|
||||
RedisClusterClient client = (RedisClusterClient) this.client;
|
||||
|
||||
return getShareNativeConnection()
|
||||
? new LettuceReactiveRedisClusterConnection(getSharedReactiveConnection(), reactiveConnectionProvider, client)
|
||||
: new LettuceReactiveRedisClusterConnection(reactiveConnectionProvider, client);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the shared connection if {@link #getShareNativeConnection() native connection sharing} is enabled and
|
||||
* reset any previously existing connection.
|
||||
*/
|
||||
public void initConnection() {
|
||||
|
||||
resetConnection();
|
||||
|
||||
if (isClusterAware()) {
|
||||
getSharedClusterConnection();
|
||||
} else {
|
||||
getSharedConnection();
|
||||
}
|
||||
|
||||
getSharedReactiveConnection();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the underlying shared Connection, to be reinitialized on next access.
|
||||
*/
|
||||
public void resetConnection() {
|
||||
|
||||
Optionals.toStream(Optional.ofNullable(connection), Optional.ofNullable(reactiveConnection))
|
||||
.forEach(SharedConnection::resetConnection);
|
||||
|
||||
synchronized (this.connectionMonitor) {
|
||||
|
||||
this.connection = null;
|
||||
this.reactiveConnection = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate the shared connections and reinitialize if invalid.
|
||||
*/
|
||||
public void validateConnection() {
|
||||
|
||||
assertStarted();
|
||||
|
||||
getOrCreateSharedConnection().validateConnection();
|
||||
getOrCreateSharedReactiveConnection().validateConnection();
|
||||
}
|
||||
|
||||
private SharedConnection<byte[]> getOrCreateSharedConnection() {
|
||||
|
||||
synchronized (this.connectionMonitor) {
|
||||
|
||||
if (this.connection == null) {
|
||||
this.connection = new SharedConnection<>(connectionProvider);
|
||||
}
|
||||
|
||||
return this.connection;
|
||||
}
|
||||
}
|
||||
|
||||
private SharedConnection<ByteBuffer> getOrCreateSharedReactiveConnection() {
|
||||
|
||||
synchronized (this.connectionMonitor) {
|
||||
|
||||
if (this.reactiveConnection == null) {
|
||||
this.reactiveConnection = new SharedConnection<>(reactiveConnectionProvider);
|
||||
}
|
||||
|
||||
return this.reactiveConnection;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
|
||||
return EXCEPTION_TRANSLATION.translate(ex);
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -987,8 +657,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
|
||||
AbstractRedisClient client = getNativeClient();
|
||||
|
||||
Assert.state(client != null,
|
||||
"Client not yet initialized; Did you forget to call initialize the bean");
|
||||
Assert.state(client != null, "Client not yet initialized; Did you forget to call initialize the bean");
|
||||
|
||||
return client;
|
||||
}
|
||||
@@ -1059,6 +728,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
* @return {@literal null} if not set.
|
||||
* @since 1.7
|
||||
*/
|
||||
@Nullable
|
||||
public ClientResources getClientResources() {
|
||||
return clientConfiguration.getClientResources().orElse(null);
|
||||
}
|
||||
@@ -1176,6 +846,324 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
return RedisConfiguration.isClusterConfiguration(configuration);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
State current = this.state.getAndUpdate(state -> isCreatedOrStopped(state) ? State.STARTING : state);
|
||||
|
||||
if (isCreatedOrStopped(current)) {
|
||||
|
||||
AbstractRedisClient client = createClient();
|
||||
this.client = client;
|
||||
LettuceConnectionProvider connectionProvider = new ExceptionTranslatingConnectionProvider(
|
||||
createConnectionProvider(this.client, CODEC));
|
||||
this.connectionProvider = connectionProvider;
|
||||
this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider(
|
||||
createConnectionProvider(this.client, LettuceReactiveRedisConnection.CODEC));
|
||||
|
||||
if (isClusterAware()) {
|
||||
this.clusterCommandExecutor = createClusterCommandExecutor((RedisClusterClient) client, connectionProvider);
|
||||
}
|
||||
|
||||
this.state.set(State.STARTED);
|
||||
|
||||
if (getEagerInitialization() && getShareNativeConnection()) {
|
||||
initConnection();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isCreatedOrStopped(@Nullable State state) {
|
||||
return State.CREATED.equals(state) || State.STOPPED.equals(state);
|
||||
}
|
||||
|
||||
private ClusterCommandExecutor createClusterCommandExecutor(RedisClusterClient client,
|
||||
LettuceConnectionProvider connectionProvider) {
|
||||
|
||||
return new ClusterCommandExecutor(new LettuceClusterTopologyProvider(client),
|
||||
new LettuceClusterConnection.LettuceClusterNodeResourceProvider(connectionProvider), EXCEPTION_TRANSLATION,
|
||||
this.executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
|
||||
|
||||
resetConnection();
|
||||
|
||||
dispose(connectionProvider);
|
||||
connectionProvider = null;
|
||||
|
||||
dispose(reactiveConnectionProvider);
|
||||
reactiveConnectionProvider = null;
|
||||
|
||||
if (client != null) {
|
||||
|
||||
try {
|
||||
Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
|
||||
Duration timeout = clientConfiguration.getShutdownTimeout();
|
||||
|
||||
client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
|
||||
client = null;
|
||||
} catch (Exception cause) {
|
||||
if (log.isWarnEnabled()) {
|
||||
log.warn(ClassUtils.getShortName(client.getClass()) + " did not shut down gracefully.", cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state.set(State.STOPPED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPhase() {
|
||||
return phase;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specify the lifecycle phase for pausing and resuming this executor. The default is {@code 0}.
|
||||
*
|
||||
* @since 3.2
|
||||
* @see SmartLifecycle#getPhase()
|
||||
*/
|
||||
public void setPhase(int phase) {
|
||||
this.phase = phase;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return State.STARTED.equals(this.state.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() {
|
||||
if (isAutoStartup()) {
|
||||
start();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
|
||||
stop();
|
||||
this.client = null;
|
||||
|
||||
ClusterCommandExecutor clusterCommandExecutor = this.clusterCommandExecutor;
|
||||
|
||||
if (clusterCommandExecutor != null) {
|
||||
try {
|
||||
clusterCommandExecutor.destroy();
|
||||
this.clusterCommandExecutor = null;
|
||||
} catch (Exception cause) {
|
||||
log.warn("Cannot properly close cluster command executor", cause);
|
||||
}
|
||||
}
|
||||
|
||||
this.state.set(State.DESTROYED);
|
||||
}
|
||||
|
||||
private void dispose(@Nullable LettuceConnectionProvider connectionProvider) {
|
||||
|
||||
if (connectionProvider instanceof DisposableBean disposableBean) {
|
||||
try {
|
||||
disposableBean.destroy();
|
||||
} catch (Exception cause) {
|
||||
if (log.isWarnEnabled()) {
|
||||
log.warn(connectionProvider + " did not shut down gracefully.", cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedisConnection getConnection() {
|
||||
|
||||
assertStarted();
|
||||
|
||||
if (isClusterAware()) {
|
||||
return getClusterConnection();
|
||||
}
|
||||
|
||||
LettuceConnection connection = doCreateLettuceConnection(getSharedConnection(), connectionProvider, getTimeout(),
|
||||
getDatabase());
|
||||
|
||||
connection.setConvertPipelineAndTxResults(this.convertPipelineAndTxResults);
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedisClusterConnection getClusterConnection() {
|
||||
|
||||
assertStarted();
|
||||
|
||||
if (!isClusterAware()) {
|
||||
throw new InvalidDataAccessApiUsageException("Cluster is not configured");
|
||||
}
|
||||
|
||||
RedisClusterClient clusterClient = (RedisClusterClient) client;
|
||||
|
||||
StatefulRedisClusterConnection<byte[], byte[]> sharedConnection = getSharedClusterConnection();
|
||||
|
||||
LettuceClusterTopologyProvider topologyProvider = new LettuceClusterTopologyProvider(clusterClient);
|
||||
|
||||
return doCreateLettuceClusterConnection(sharedConnection, this.connectionProvider, topologyProvider,
|
||||
getRequiredClusterCommandExecutor(), this.clientConfiguration.getCommandTimeout());
|
||||
}
|
||||
|
||||
/**
|
||||
* Customization hook for {@link LettuceConnection} creation.
|
||||
*
|
||||
* @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is
|
||||
* {@literal true}; {@literal null} otherwise.
|
||||
* @param connectionProvider the {@link LettuceConnectionProvider} to release connections.
|
||||
* @param timeout command timeout in {@link TimeUnit#MILLISECONDS}.
|
||||
* @param database database index to operate on.
|
||||
* @return the {@link LettuceConnection}.
|
||||
* @throws IllegalArgumentException if a required parameter is {@literal null}.
|
||||
* @since 2.2
|
||||
*/
|
||||
protected LettuceConnection doCreateLettuceConnection(
|
||||
@Nullable StatefulRedisConnection<byte[], byte[]> sharedConnection, LettuceConnectionProvider connectionProvider,
|
||||
long timeout, int database) {
|
||||
|
||||
LettuceConnection connection = new LettuceConnection(sharedConnection, connectionProvider, timeout, database);
|
||||
|
||||
connection.setPipeliningFlushPolicy(this.pipeliningFlushPolicy);
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Customization hook for {@link LettuceClusterConnection} creation.
|
||||
*
|
||||
* @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is
|
||||
* {@literal true}; {@literal null} otherwise.
|
||||
* @param connectionProvider the {@link LettuceConnectionProvider} to release connections.
|
||||
* @param topologyProvider the {@link ClusterTopologyProvider}.
|
||||
* @param clusterCommandExecutor the {@link ClusterCommandExecutor} to release connections.
|
||||
* @param commandTimeout command timeout {@link Duration}.
|
||||
* @return the {@link LettuceConnection}.
|
||||
* @throws IllegalArgumentException if a required parameter is {@literal null}.
|
||||
* @since 2.2
|
||||
*/
|
||||
protected LettuceClusterConnection doCreateLettuceClusterConnection(
|
||||
@Nullable StatefulRedisClusterConnection<byte[], byte[]> sharedConnection,
|
||||
LettuceConnectionProvider connectionProvider, ClusterTopologyProvider topologyProvider,
|
||||
ClusterCommandExecutor clusterCommandExecutor, Duration commandTimeout) {
|
||||
|
||||
LettuceClusterConnection connection = new LettuceClusterConnection(sharedConnection, connectionProvider,
|
||||
topologyProvider, clusterCommandExecutor, commandTimeout);
|
||||
|
||||
connection.setPipeliningFlushPolicy(this.pipeliningFlushPolicy);
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LettuceReactiveRedisConnection getReactiveConnection() {
|
||||
|
||||
assertStarted();
|
||||
|
||||
if (isClusterAware()) {
|
||||
return getReactiveClusterConnection();
|
||||
}
|
||||
|
||||
return getShareNativeConnection()
|
||||
? new LettuceReactiveRedisConnection(getSharedReactiveConnection(), reactiveConnectionProvider)
|
||||
: new LettuceReactiveRedisConnection(reactiveConnectionProvider);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LettuceReactiveRedisClusterConnection getReactiveClusterConnection() {
|
||||
|
||||
assertStarted();
|
||||
|
||||
if (!isClusterAware()) {
|
||||
throw new InvalidDataAccessApiUsageException("Cluster is not configured");
|
||||
}
|
||||
|
||||
RedisClusterClient client = (RedisClusterClient) this.client;
|
||||
|
||||
return getShareNativeConnection()
|
||||
? new LettuceReactiveRedisClusterConnection(getSharedReactiveConnection(), reactiveConnectionProvider, client)
|
||||
: new LettuceReactiveRedisClusterConnection(reactiveConnectionProvider, client);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the shared connection if {@link #getShareNativeConnection() native connection sharing} is enabled and
|
||||
* reset any previously existing connection.
|
||||
*/
|
||||
public void initConnection() {
|
||||
|
||||
resetConnection();
|
||||
|
||||
if (isClusterAware()) {
|
||||
getSharedClusterConnection();
|
||||
} else {
|
||||
getSharedConnection();
|
||||
}
|
||||
|
||||
getSharedReactiveConnection();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the underlying shared Connection, to be reinitialized on next access.
|
||||
*/
|
||||
public void resetConnection() {
|
||||
|
||||
Optionals.toStream(Optional.ofNullable(connection), Optional.ofNullable(reactiveConnection))
|
||||
.forEach(SharedConnection::resetConnection);
|
||||
|
||||
synchronized (this.connectionMonitor) {
|
||||
|
||||
this.connection = null;
|
||||
this.reactiveConnection = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate the shared connections and reinitialize if invalid.
|
||||
*/
|
||||
public void validateConnection() {
|
||||
|
||||
assertStarted();
|
||||
|
||||
getOrCreateSharedConnection().validateConnection();
|
||||
getOrCreateSharedReactiveConnection().validateConnection();
|
||||
}
|
||||
|
||||
private SharedConnection<byte[]> getOrCreateSharedConnection() {
|
||||
|
||||
synchronized (this.connectionMonitor) {
|
||||
|
||||
if (this.connection == null) {
|
||||
this.connection = new SharedConnection<>(connectionProvider);
|
||||
}
|
||||
|
||||
return this.connection;
|
||||
}
|
||||
}
|
||||
|
||||
private SharedConnection<ByteBuffer> getOrCreateSharedReactiveConnection() {
|
||||
|
||||
synchronized (this.connectionMonitor) {
|
||||
|
||||
if (this.reactiveConnection == null) {
|
||||
this.reactiveConnection = new SharedConnection<>(reactiveConnectionProvider);
|
||||
}
|
||||
|
||||
return this.reactiveConnection;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
|
||||
return EXCEPTION_TRANSLATION.translate(ex);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the shared connection using {@literal byte[]} encoding for imperative API use. {@literal null} if
|
||||
* {@link #getShareNativeConnection() connection sharing} is disabled or when connected to Redis Cluster.
|
||||
@@ -1239,7 +1227,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
|
||||
return isStaticMasterReplicaAware() ? createStaticMasterReplicaConnectionProvider((RedisClient) client, codec)
|
||||
: isClusterAware() ? createClusterConnectionProvider((RedisClusterClient) client, codec)
|
||||
: createStandaloneConnectionProvider((RedisClient) client, codec);
|
||||
: createStandaloneConnectionProvider((RedisClient) client, codec);
|
||||
}
|
||||
|
||||
@SuppressWarnings("all")
|
||||
@@ -1248,8 +1236,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
|
||||
List<RedisURI> nodes = ((RedisStaticMasterReplicaConfiguration) this.configuration).getNodes().stream()
|
||||
.map(it -> createRedisURIAndApplySettings(it.getHostName(), it.getPort()))
|
||||
.peek(it -> it.setDatabase(getDatabase()))
|
||||
.collect(Collectors.toList());
|
||||
.peek(it -> it.setDatabase(getDatabase())).collect(Collectors.toList());
|
||||
|
||||
return new StaticMasterReplicaConnectionProvider(client, codec, nodes,
|
||||
getClientConfiguration().getReadFrom().orElse(null));
|
||||
@@ -1267,14 +1254,12 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
|
||||
return isStaticMasterReplicaAware() ? createStaticMasterReplicaClient()
|
||||
: isRedisSentinelAware() ? createSentinelClient()
|
||||
: isClusterAware() ? createClusterClient()
|
||||
: createBasicClient();
|
||||
: isClusterAware() ? createClusterClient() : createBasicClient();
|
||||
}
|
||||
|
||||
private RedisClient createStaticMasterReplicaClient() {
|
||||
|
||||
RedisClient redisClient = this.clientConfiguration.getClientResources()
|
||||
.map(RedisClient::create)
|
||||
RedisClient redisClient = this.clientConfiguration.getClientResources().map(RedisClient::create)
|
||||
.orElseGet(RedisClient::create);
|
||||
|
||||
this.clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);
|
||||
@@ -1318,8 +1303,8 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
|
||||
redisUri.setCredentialsProvider(factory.createCredentialsProvider(this.configuration));
|
||||
|
||||
RedisCredentialsProvider sentinelCredentials =
|
||||
factory.createSentinelCredentialsProvider((RedisSentinelConfiguration) this.configuration);
|
||||
RedisCredentialsProvider sentinelCredentials = factory
|
||||
.createSentinelCredentialsProvider((RedisSentinelConfiguration) this.configuration);
|
||||
|
||||
redisUri.getSentinels().forEach(it -> it.setCredentialsProvider(sentinelCredentials));
|
||||
});
|
||||
@@ -1335,8 +1320,7 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
ClusterConfiguration clusterConfiguration = (ClusterConfiguration) this.configuration;
|
||||
|
||||
clusterConfiguration.getClusterNodes().stream()
|
||||
.map(node -> createRedisURIAndApplySettings(node.getHost(), node.getPort()))
|
||||
.forEach(initialUris::add);
|
||||
.map(node -> createRedisURIAndApplySettings(node.getHost(), node.getPort())).forEach(initialUris::add);
|
||||
|
||||
RedisClusterClient clusterClient = this.clientConfiguration.getClientResources()
|
||||
.map(clientResources -> RedisClusterClient.create(clientResources, initialUris))
|
||||
@@ -1351,13 +1335,11 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
|
||||
Optional<ClientOptions> clientOptions = this.clientConfiguration.getClientOptions();
|
||||
|
||||
Optional<ClusterClientOptions> clusterClientOptions = clientOptions
|
||||
.filter(ClusterClientOptions.class::isInstance)
|
||||
Optional<ClusterClientOptions> clusterClientOptions = clientOptions.filter(ClusterClientOptions.class::isInstance)
|
||||
.map(ClusterClientOptions.class::cast);
|
||||
|
||||
ClusterClientOptions resolvedClusterClientOptions = clusterClientOptions.orElseGet(() -> clientOptions
|
||||
.map(it -> ClusterClientOptions.builder(it).build())
|
||||
.orElseGet(ClusterClientOptions::create));
|
||||
.map(it -> ClusterClientOptions.builder(it).build()).orElseGet(ClusterClientOptions::create));
|
||||
|
||||
if (clusterConfiguration.getMaxRedirects() != null) {
|
||||
return resolvedClusterClientOptions.mutate().maxRedirects(clusterConfiguration.getMaxRedirects()).build();
|
||||
@@ -1369,13 +1351,11 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
@SuppressWarnings("all")
|
||||
private RedisClient createBasicClient() {
|
||||
|
||||
RedisURI uri = isDomainSocketAware()
|
||||
? createRedisSocketURIAndApplySettings(getSocketConfiguration().getSocket())
|
||||
RedisURI uri = isDomainSocketAware() ? createRedisSocketURIAndApplySettings(getSocketConfiguration().getSocket())
|
||||
: createRedisURIAndApplySettings(getHostName(), getPort());
|
||||
|
||||
RedisClient redisClient = this.clientConfiguration.getClientResources()
|
||||
.map(clientResources -> RedisClient.create(clientResources, uri))
|
||||
.orElseGet(() -> RedisClient.create(uri));
|
||||
.map(clientResources -> RedisClient.create(clientResources, uri)).orElseGet(() -> RedisClient.create(uri));
|
||||
|
||||
this.clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);
|
||||
|
||||
@@ -1444,8 +1424,8 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
getRedisPassword().toOptional().ifPresent(builder::withPassword);
|
||||
}
|
||||
|
||||
clientConfiguration.getRedisCredentialsProviderFactory().ifPresent(factory ->
|
||||
builder.withAuthentication(factory.createCredentialsProvider(this.configuration)));
|
||||
clientConfiguration.getRedisCredentialsProviderFactory()
|
||||
.ifPresent(factory -> builder.withAuthentication(factory.createCredentialsProvider(this.configuration)));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -1469,12 +1449,6 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
|
||||
return clientConfiguration.getCommandTimeout().toMillis();
|
||||
}
|
||||
|
||||
private void logWarning(String message, Object... arguments) {
|
||||
if (this.log.isWarnEnabled()) {
|
||||
this.log.warn(String.format(message, arguments));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for shared connections. Keeps track of the connection lifecycleThe wrapper is thread-safe as it
|
||||
* synchronizes concurrent calls by blocking.
|
||||
|
||||
Reference in New Issue
Block a user