diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java index ffcf5200c..0ec7bf8ab 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java @@ -299,11 +299,18 @@ public class JedisConnection extends AbstractRedisConnection { super.close(); + JedisSubscription subscription = this.subscription; + if (subscription != null) { + subscription.close(); + this.subscription = null; + } + // return the connection to the pool if (pool != null) { jedis.close(); return; } + // else close the connection normally (doing the try/catch dance) Exception exc = null; try { @@ -316,8 +323,10 @@ public class JedisConnection extends AbstractRedisConnection { } catch (Exception ex) { exc = ex; } - if (exc != null) + + if (exc != null) { throw convertJedisAccessException(exc); + } } @Override diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java index 62e612e98..a85a3a878 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java @@ -62,6 +62,7 @@ import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.dao.QueryTimeoutException; import org.springframework.data.redis.ExceptionTranslationStrategy; import org.springframework.data.redis.FallbackExceptionTranslationStrategy; +import org.springframework.data.redis.RedisSystemException; import org.springframework.data.redis.connection.*; import org.springframework.data.redis.connection.convert.TransactionResultConverter; import org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider.TargetAware; @@ -351,22 +352,29 @@ public class LettuceConnection extends AbstractRedisConnection { isClosed = true; + reset(); + } + + private void reset() { + if (asyncDedicatedConn != null) { try { if (customizedDatabaseIndex()) { potentiallySelectDatabase(defaultDbIndex); } connectionProvider.release(asyncDedicatedConn); + asyncDedicatedConn = null; } catch (RuntimeException ex) { throw convertLettuceAccessException(ex); } } + LettuceSubscription subscription = this.subscription; if (subscription != null) { if (subscription.isAlive()) { subscription.doClose(); } - subscription = null; + this.subscription = null; } this.dbIndex = defaultDbIndex; @@ -381,7 +389,8 @@ public class LettuceConnection extends AbstractRedisConnection { public RedisClusterAsyncCommands getNativeConnection() { LettuceSubscription subscription = this.subscription; - return (subscription != null ? subscription.getNativeConnection().async() : getAsyncConnection()); + return (subscription != null && subscription.isAlive() ? subscription.getNativeConnection().async() + : getAsyncConnection()); } @Override @@ -509,8 +518,8 @@ public class LettuceConnection extends AbstractRedisConnection { LettuceTransactionResultConverter resultConverter = new LettuceTransactionResultConverter( new LinkedList<>(txResults), exceptionConverter); - pipeline(newLettuceResult(exec, source -> resultConverter - .convert(LettuceConverters.transactionResultUnwrapper().convert(source)))); + pipeline(newLettuceResult(exec, + source -> resultConverter.convert(LettuceConverters.transactionResultUnwrapper().convert(source)))); return null; } @@ -695,7 +704,8 @@ public class LettuceConnection extends AbstractRedisConnection { @SuppressWarnings("unchecked") protected StatefulRedisPubSubConnection switchToPubSub() { - close(); + checkSubscription(); + reset(); return connectionProvider.getConnection(StatefulRedisPubSubConnection.class); } @@ -870,6 +880,10 @@ public class LettuceConnection extends AbstractRedisConnection { protected RedisClusterAsyncCommands getAsyncDedicatedConnection() { + if (isClosed()) { + throw new RedisSystemException("Connection is closed", null); + } + StatefulConnection connection = getOrCreateDedicatedConnection(); if (connection instanceof StatefulRedisConnection) { diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java index 2c89deffa..44225b206 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java @@ -60,7 +60,8 @@ public class LettuceSubscription extends AbstractSubscription { this.connection = pubsubConnection; this.listener = new LettuceMessageListener(listener, - listener instanceof SubscriptionListener ? (SubscriptionListener) listener : SubscriptionListener.NO_OP_SUBSCRIPTION_LISTENER); + listener instanceof SubscriptionListener ? (SubscriptionListener) listener + : SubscriptionListener.NO_OP_SUBSCRIPTION_LISTENER); this.connectionProvider = connectionProvider; this.pubsub = connection.sync(); this.pubSubAsync = connection.async(); @@ -75,6 +76,10 @@ public class LettuceSubscription extends AbstractSubscription { @Override protected void doClose() { + if (!isAlive()) { + return; + } + List> futures = new ArrayList<>(); if (!getChannels().isEmpty()) { diff --git a/src/main/java/org/springframework/data/redis/connection/util/AbstractSubscription.java b/src/main/java/org/springframework/data/redis/connection/util/AbstractSubscription.java index 1bf0cf184..0f1f439c3 100644 --- a/src/main/java/org/springframework/data/redis/connection/util/AbstractSubscription.java +++ b/src/main/java/org/springframework/data/redis/connection/util/AbstractSubscription.java @@ -100,6 +100,7 @@ public abstract class AbstractSubscription implements Subscription { @Override public void close() { doClose(); + alive.set(false); } /** @@ -231,8 +232,7 @@ public abstract class AbstractSubscription implements Subscription { private void closeIfUnsubscribed() { if (channels.isEmpty() && patterns.isEmpty()) { - alive.set(false); - doClose(); + close(); } } diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java index 77f328ffe..1be340af1 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java @@ -22,14 +22,12 @@ import io.lettuce.core.EpollProvider; import io.lettuce.core.KqueueProvider; import io.lettuce.core.ReadFrom; import io.lettuce.core.RedisException; -import io.lettuce.core.RedisFuture; import io.lettuce.core.api.async.RedisAsyncCommands; import io.lettuce.core.api.reactive.BaseRedisReactiveCommands; import reactor.test.StepVerifier; import java.io.File; import java.time.Duration; -import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; @@ -225,19 +223,11 @@ class LettuceConnectionFactoryTests { factory.setShareNativeConnection(false); RedisConnection conn2 = factory.getConnection(); assertThat(conn2.getNativeConnection()).isNotSameAs(connection.getNativeConnection()); - // Give some time for native connection to asynchronously initialize, else close doesn't work Thread.sleep(100); conn2.close(); assertThat(conn2.isClosed()).isTrue(); - // Give some time for native connection to asynchronously close - Thread.sleep(100); - RedisFuture future = ((RedisAsyncCommands) conn2.getNativeConnection()).ping(); - try { - future.get(); - fail("The native connection should be closed"); - } catch (ExecutionException e) { - // expected, Lettuce async failures are signalled on the Future - } + + assertThatExceptionOfType(RedisSystemException.class).isThrownBy(conn2::getNativeConnection); } @SuppressWarnings("unchecked")