Correctly release connection after switching to Pub/Sub mode.

LettuceConnection.switchToPubSub now correctly releases its underlying connection when switching to Pub/Sub. Also, we improved safeguards to avoid using closed connections.

Closes #2331
This commit is contained in:
Mark Paluch
2022-06-03 09:14:18 +02:00
parent 08ed1c472b
commit e4f5c433e2
5 changed files with 39 additions and 21 deletions

View File

@@ -299,11 +299,18 @@ public class JedisConnection extends AbstractRedisConnection {
super.close();
JedisSubscription subscription = this.subscription;
if (subscription != null) {
subscription.close();
this.subscription = null;
}
// return the connection to the pool
if (pool != null) {
jedis.close();
return;
}
// else close the connection normally (doing the try/catch dance)
Exception exc = null;
try {
@@ -316,8 +323,10 @@ public class JedisConnection extends AbstractRedisConnection {
} catch (Exception ex) {
exc = ex;
}
if (exc != null)
if (exc != null) {
throw convertJedisAccessException(exc);
}
}
@Override

View File

@@ -62,6 +62,7 @@ import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.data.redis.ExceptionTranslationStrategy;
import org.springframework.data.redis.FallbackExceptionTranslationStrategy;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.convert.TransactionResultConverter;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider.TargetAware;
@@ -351,22 +352,29 @@ public class LettuceConnection extends AbstractRedisConnection {
isClosed = true;
reset();
}
private void reset() {
if (asyncDedicatedConn != null) {
try {
if (customizedDatabaseIndex()) {
potentiallySelectDatabase(defaultDbIndex);
}
connectionProvider.release(asyncDedicatedConn);
asyncDedicatedConn = null;
} catch (RuntimeException ex) {
throw convertLettuceAccessException(ex);
}
}
LettuceSubscription subscription = this.subscription;
if (subscription != null) {
if (subscription.isAlive()) {
subscription.doClose();
}
subscription = null;
this.subscription = null;
}
this.dbIndex = defaultDbIndex;
@@ -381,7 +389,8 @@ public class LettuceConnection extends AbstractRedisConnection {
public RedisClusterAsyncCommands<byte[], byte[]> getNativeConnection() {
LettuceSubscription subscription = this.subscription;
return (subscription != null ? subscription.getNativeConnection().async() : getAsyncConnection());
return (subscription != null && subscription.isAlive() ? subscription.getNativeConnection().async()
: getAsyncConnection());
}
@Override
@@ -509,8 +518,8 @@ public class LettuceConnection extends AbstractRedisConnection {
LettuceTransactionResultConverter resultConverter = new LettuceTransactionResultConverter(
new LinkedList<>(txResults), exceptionConverter);
pipeline(newLettuceResult(exec, source -> resultConverter
.convert(LettuceConverters.transactionResultUnwrapper().convert(source))));
pipeline(newLettuceResult(exec,
source -> resultConverter.convert(LettuceConverters.transactionResultUnwrapper().convert(source))));
return null;
}
@@ -695,7 +704,8 @@ public class LettuceConnection extends AbstractRedisConnection {
@SuppressWarnings("unchecked")
protected StatefulRedisPubSubConnection<byte[], byte[]> switchToPubSub() {
close();
checkSubscription();
reset();
return connectionProvider.getConnection(StatefulRedisPubSubConnection.class);
}
@@ -870,6 +880,10 @@ public class LettuceConnection extends AbstractRedisConnection {
protected RedisClusterAsyncCommands<byte[], byte[]> getAsyncDedicatedConnection() {
if (isClosed()) {
throw new RedisSystemException("Connection is closed", null);
}
StatefulConnection<byte[], byte[]> connection = getOrCreateDedicatedConnection();
if (connection instanceof StatefulRedisConnection) {

View File

@@ -60,7 +60,8 @@ public class LettuceSubscription extends AbstractSubscription {
this.connection = pubsubConnection;
this.listener = new LettuceMessageListener(listener,
listener instanceof SubscriptionListener ? (SubscriptionListener) listener : SubscriptionListener.NO_OP_SUBSCRIPTION_LISTENER);
listener instanceof SubscriptionListener ? (SubscriptionListener) listener
: SubscriptionListener.NO_OP_SUBSCRIPTION_LISTENER);
this.connectionProvider = connectionProvider;
this.pubsub = connection.sync();
this.pubSubAsync = connection.async();
@@ -75,6 +76,10 @@ public class LettuceSubscription extends AbstractSubscription {
@Override
protected void doClose() {
if (!isAlive()) {
return;
}
List<CompletableFuture<?>> futures = new ArrayList<>();
if (!getChannels().isEmpty()) {

View File

@@ -100,6 +100,7 @@ public abstract class AbstractSubscription implements Subscription {
@Override
public void close() {
doClose();
alive.set(false);
}
/**
@@ -231,8 +232,7 @@ public abstract class AbstractSubscription implements Subscription {
private void closeIfUnsubscribed() {
if (channels.isEmpty() && patterns.isEmpty()) {
alive.set(false);
doClose();
close();
}
}