diff --git a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java index 205487fa1..e7fe73ab6 100644 --- a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java @@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.function.Supplier; import org.springframework.dao.PessimisticLockingFailureException; import org.springframework.data.redis.connection.ReactiveRedisConnection; @@ -137,9 +138,14 @@ class DefaultRedisCacheWriter implements RedisCacheWriter { Assert.notNull(name, "Name must not be null"); Assert.notNull(key, "Key must not be null"); - byte[] result = shouldExpireWithin(ttl) - ? execute(name, connection -> connection.stringCommands().getEx(key, Expiration.from(ttl))) - : execute(name, connection -> connection.stringCommands().get(key)); + return execute(name, connection -> doGet(connection, name, key, ttl)); + } + + @Nullable + private byte[] doGet(RedisConnection connection, String name, byte[] key, @Nullable Duration ttl) { + + byte[] result = shouldExpireWithin(ttl) ? connection.stringCommands().getEx(key, Expiration.from(ttl)) + : connection.stringCommands().get(key); statistics.incGets(name); @@ -152,6 +158,50 @@ class DefaultRedisCacheWriter implements RedisCacheWriter { return result; } + @Override + public byte[] get(String name, byte[] key, Supplier valueLoader, @Nullable Duration ttl, + boolean timeToIdleEnabled) { + + Assert.notNull(name, "Name must not be null"); + Assert.notNull(key, "Key must not be null"); + + boolean withTtl = shouldExpireWithin(ttl); + + // double-checked locking optimization + if (isLockingCacheWriter()) { + byte[] bytes = get(name, key, timeToIdleEnabled && withTtl ? ttl : null); + if (bytes != null) { + return bytes; + } + } + + return execute(name, connection -> { + + boolean wasLocked = false; + if (isLockingCacheWriter()) { + doLock(name, key, null, connection); + wasLocked = true; + } + + try { + + byte[] result = doGet(connection, name, key, timeToIdleEnabled && withTtl ? ttl : null); + + if (result != null) { + return result; + } + + byte[] value = valueLoader.get(); + doPut(connection, name, key, value, ttl); + return value; + } finally { + if (isLockingCacheWriter() && wasLocked) { + doUnlock(name, connection); + } + } + }); + } + @Override public boolean supportsAsyncRetrieve() { return asyncCacheWriter.isSupported(); @@ -186,17 +236,21 @@ class DefaultRedisCacheWriter implements RedisCacheWriter { Assert.notNull(value, "Value must not be null"); execute(name, connection -> { - - if (shouldExpireWithin(ttl)) { - connection.stringCommands().set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), - SetOption.upsert()); - } else { - connection.stringCommands().set(key, value); - } - + doPut(connection, name, key, value, ttl); return "OK"; }); + } + + private void doPut(RedisConnection connection, String name, byte[] key, byte[] value, @Nullable Duration ttl) { + + if (shouldExpireWithin(ttl)) { + connection.stringCommands().set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), + SetOption.upsert()); + } else { + connection.stringCommands().set(key, value); + } + statistics.incPuts(name); } diff --git a/src/main/java/org/springframework/data/redis/cache/RedisCache.java b/src/main/java/org/springframework/data/redis/cache/RedisCache.java index 187add251..fb94eb244 100644 --- a/src/main/java/org/springframework/data/redis/cache/RedisCache.java +++ b/src/main/java/org/springframework/data/redis/cache/RedisCache.java @@ -25,8 +25,6 @@ import java.util.Map.Entry; import java.util.StringJoiner; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import org.springframework.cache.Cache; @@ -64,8 +62,6 @@ public class RedisCache extends AbstractValueAdaptingCache { static final String CACHE_RETRIEVAL_UNSUPPORTED_OPERATION_EXCEPTION_MESSAGE = "The Redis driver configured with RedisCache through RedisCacheWriter does not support CompletableFuture-based retrieval"; - private final Lock lock = new ReentrantLock(); - private final RedisCacheConfiguration cacheConfiguration; private final RedisCacheWriter cacheWriter; @@ -154,28 +150,18 @@ public class RedisCache extends AbstractValueAdaptingCache { @SuppressWarnings("unchecked") public T get(Object key, Callable valueLoader) { - ValueWrapper result = get(key); + byte[] binaryKey = createAndConvertCacheKey(key); + byte[] binaryValue = getCacheWriter().get(getName(), binaryKey, + () -> serializeCacheValue(toStoreValue(loadCacheValue(key, valueLoader))), getTimeToLive(key), + getCacheConfiguration().isTimeToIdleEnabled()); - return result != null ? (T) result.get() : getSynchronized(key, valueLoader); - } + ValueWrapper result = toValueWrapper(deserializeCacheValue(binaryValue)); - @Nullable - @SuppressWarnings("unchecked") - private T getSynchronized(Object key, Callable valueLoader) { - - lock.lock(); - - try { - ValueWrapper result = get(key); - return result != null ? (T) result.get() : loadCacheValue(key, valueLoader); - } finally { - lock.unlock(); - } + return result != null ? (T) result.get() : null; } /** - * Loads the {@link Object} using the given {@link Callable valueLoader} and {@link #put(Object, Object) puts} the - * {@link Object loaded value} in the cache. + * Loads the {@link Object} using the given {@link Callable valueLoader}. * * @param {@link Class type} of the loaded {@link Object cache value}. * @param key {@link Object key} mapped to the loaded {@link Object cache value}. @@ -184,17 +170,11 @@ public class RedisCache extends AbstractValueAdaptingCache { */ protected T loadCacheValue(Object key, Callable valueLoader) { - T value; - try { - value = valueLoader.call(); + return valueLoader.call(); } catch (Exception ex) { throw new ValueRetrievalException(key, valueLoader, ex); } - - put(key, value); - - return value; } @Override diff --git a/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java index 04dd0e350..fbf7e96dc 100644 --- a/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java @@ -24,14 +24,14 @@ import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** - * {@link RedisCacheWriter} provides low-level access to Redis commands ({@code SET, SETNX, GET, EXPIRE,...}) - * used for caching. + * {@link RedisCacheWriter} provides low-level access to Redis commands ({@code SET, SETNX, GET, EXPIRE,...}) used for + * caching. *

* The {@link RedisCacheWriter} may be shared by multiple cache implementations and is responsible for reading/writing * binary data from/to Redis. The implementation honors potential cache lock flags that might be set. *

- * The default {@link RedisCacheWriter} implementation can be customized with {@link BatchStrategy} - * to tune performance behavior. + * The default {@link RedisCacheWriter} implementation can be customized with {@link BatchStrategy} to tune performance + * behavior. * * @author Christoph Strobl * @author Mark Paluch @@ -96,9 +96,8 @@ public interface RedisCacheWriter extends CacheStatisticsProvider { * * @param connectionFactory must not be {@literal null}. * @param sleepTime sleep time between lock access attempts, must not be {@literal null}. - * @param lockTtlFunction TTL function to compute the Lock TTL. The function is called with contextual keys - * and values (such as the cache name on cleanup or the actual key/value on put requests); - * must not be {@literal null}. + * @param lockTtlFunction TTL function to compute the Lock TTL. The function is called with contextual keys and values + * (such as the cache name on cleanup or the actual key/value on put requests); must not be {@literal null}. * @param batchStrategy must not be {@literal null}. * @return new instance of {@link DefaultRedisCacheWriter}. * @since 3.2 @@ -124,8 +123,8 @@ public interface RedisCacheWriter extends CacheStatisticsProvider { byte[] get(String name, byte[] key); /** - * Get the binary value representation from Redis stored for the given key and set - * the given {@link Duration TTL expiration} for the cache entry. + * Get the binary value representation from Redis stored for the given key and set the given {@link Duration TTL + * expiration} for the cache entry. * * @param name must not be {@literal null}. * @param key must not be {@literal null}. @@ -138,14 +137,41 @@ public interface RedisCacheWriter extends CacheStatisticsProvider { } /** - * Determines whether the asynchronous {@link #retrieve(String, byte[])} - * and {@link #retrieve(String, byte[], Duration)} cache operations are supported by the implementation. + * Get the binary value representation from Redis stored for the given key and set the given {@link Duration TTL + * expiration} for the cache entry, obtaining the value from {@code valueLoader} if necessary. *

- * The main factor for whether the {@literal retrieve} operation can be supported will primarily be determined - * by the Redis driver in use at runtime. + * If possible (and configured for locking), implementations should ensure that the loading operation is synchronized + * so that the specified {@code valueLoader} is only called once in case of concurrent access on the same key. + * + * @param name must not be {@literal null}. + * @param key must not be {@literal null}. + * @param valueLoader value loader that creates the value if the cache lookup has been not successful. + * @param ttl {@link Duration} specifying the {@literal expiration timeout} for the cache entry. + * @param timeToIdleEnabled {@literal true} to enable Time to Idle when retrieving the value. + * @since 3.4 + */ + default byte[] get(String name, byte[] key, Supplier valueLoader, @Nullable Duration ttl, + boolean timeToIdleEnabled) { + + byte[] bytes = timeToIdleEnabled ? get(name, key, ttl) : get(name, key); + + if (bytes == null) { + bytes = valueLoader.get(); + put(name, key, bytes, ttl); + } + + return bytes; + } + + /** + * Determines whether the asynchronous {@link #retrieve(String, byte[])} and + * {@link #retrieve(String, byte[], Duration)} cache operations are supported by the implementation. *

- * Returns {@literal false} by default. This will have an effect of {@link RedisCache#retrieve(Object)} - * and {@link RedisCache#retrieve(Object, Supplier)} throwing an {@link UnsupportedOperationException}. + * The main factor for whether the {@literal retrieve} operation can be supported will primarily be determined by the + * Redis driver in use at runtime. + *

+ * Returns {@literal false} by default. This will have an effect of {@link RedisCache#retrieve(Object)} and + * {@link RedisCache#retrieve(Object, Supplier)} throwing an {@link UnsupportedOperationException}. * * @return {@literal true} if asynchronous {@literal retrieve} operations are supported by the implementation. * @since 3.2 @@ -155,8 +181,8 @@ public interface RedisCacheWriter extends CacheStatisticsProvider { } /** - * Asynchronously retrieves the {@link CompletableFuture value} to which the {@link RedisCache} - * maps the given {@link byte[] key}. + * Asynchronously retrieves the {@link CompletableFuture value} to which the {@link RedisCache} maps the given + * {@link byte[] key}. *

* This operation is non-blocking. * @@ -171,8 +197,8 @@ public interface RedisCacheWriter extends CacheStatisticsProvider { } /** - * Asynchronously retrieves the {@link CompletableFuture value} to which the {@link RedisCache} maps - * the given {@link byte[] key} setting the {@link Duration TTL expiration} for the cache entry. + * Asynchronously retrieves the {@link CompletableFuture value} to which the {@link RedisCache} maps the given + * {@link byte[] key} setting the {@link Duration TTL expiration} for the cache entry. *

* This operation is non-blocking. * @@ -264,8 +290,8 @@ public interface RedisCacheWriter extends CacheStatisticsProvider { /** * Creates a {@literal Singleton} {@link TtlFunction} using the given {@link Duration}. * - * @param duration the time to live. Can be {@link Duration#ZERO} for persistent values (i.e. cache entry - * does not expire). + * @param duration the time to live. Can be {@link Duration#ZERO} for persistent values (i.e. cache entry does not + * expire). * @return a singleton {@link TtlFunction} using {@link Duration}. */ static TtlFunction just(Duration duration) { diff --git a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java index 1f07ea611..e903b24bc 100644 --- a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java +++ b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java @@ -242,6 +242,22 @@ public class DefaultRedisCacheWriterTests { assertThat(writer.getCacheStatistics(CACHE_NAME).getPuts()).isOne(); } + @ParameterizedRedisTest // GH-2890 + void getWithValueLoaderShouldStoreCacheValue() { + + RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory) + .withStatisticsCollector(CacheStatisticsCollector.create()); + + writer.get(CACHE_NAME, binaryCacheKey, () -> binaryCacheValue, Duration.ofSeconds(5), true); + + doWithConnection(connection -> { + assertThat(connection.ttl(binaryCacheKey)).isGreaterThan(3).isLessThan(6); + }); + + assertThat(writer.getCacheStatistics(CACHE_NAME).getMisses()).isOne(); + assertThat(writer.getCacheStatistics(CACHE_NAME).getPuts()).isOne(); + } + @ParameterizedRedisTest // DATAREDIS-481, DATAREDIS-1082 void removeShouldDeleteEntry() { diff --git a/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java b/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java index 6677ab7a9..4dfbd0d1e 100644 --- a/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java +++ b/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java @@ -35,6 +35,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -251,6 +252,37 @@ public class RedisCacheTests { assertThat(result.get()).isEqualTo(null); } + @ParameterizedRedisTest // GH-2890 + void getWithValueLoaderShouldStoreNull() { + + doWithConnection(connection -> connection.set(binaryCacheKey, binaryNullValue)); + + Object result = cache.get(key, () -> { + throw new IllegalStateException(); + }); + + assertThat(result).isNull(); + } + + @ParameterizedRedisTest // GH-2890 + void getWithValueLoaderShouldRetrieveValue() { + + AtomicLong counter = new AtomicLong(); + Object result = cache.get(key, () -> { + counter.incrementAndGet(); + return sample; + }); + + assertThat(result).isEqualTo(sample); + result = cache.get(key, () -> { + counter.incrementAndGet(); + return sample; + }); + + assertThat(result).isEqualTo(sample); + assertThat(counter).hasValue(1); + } + @ParameterizedRedisTest // DATAREDIS-481 void evictShouldRemoveKey() { @@ -358,7 +390,7 @@ public class RedisCacheTests { doWithConnection(connection -> assertThat( connection.stringCommands().get("redis::cache::key-1".getBytes(StandardCharsets.UTF_8))) - .isEqualTo(binarySample)); + .isEqualTo(binarySample)); } @ParameterizedRedisTest // DATAREDIS-715 @@ -435,105 +467,6 @@ public class RedisCacheTests { assertThatIllegalStateException().isThrownBy(() -> cache.put(key, sample)); } - @ParameterizedRedisTest // GH-2079 - void multipleThreadsLoadValueOnce() throws InterruptedException { - - int threadCount = 2; - - CountDownLatch prepare = new CountDownLatch(threadCount); - CountDownLatch prepareForReturn = new CountDownLatch(1); - CountDownLatch finished = new CountDownLatch(threadCount); - AtomicInteger retrievals = new AtomicInteger(); - AtomicReference storage = new AtomicReference<>(); - - cache = new RedisCache("foo", new RedisCacheWriter() { - - @Override - public byte[] get(String name, byte[] key) { - return get(name, key, null); - } - - @Override - public byte[] get(String name, byte[] key, @Nullable Duration ttl) { - - prepare.countDown(); - try { - prepareForReturn.await(1, TimeUnit.MINUTES); - } catch (InterruptedException ex) { - throw new RuntimeException(ex); - } - - return storage.get(); - } - - @Override - public CompletableFuture retrieve(String name, byte[] key, @Nullable Duration ttl) { - byte[] value = get(name, key); - return CompletableFuture.completedFuture(value); - } - - @Override - public CompletableFuture store(String name, byte[] key, byte[] value, @Nullable Duration ttl) { - return null; - } - - @Override - public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) { - storage.set(value); - } - - @Override - public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Duration ttl) { - return new byte[0]; - } - - @Override - public void remove(String name, byte[] key) { - - } - - @Override - public void clean(String name, byte[] pattern) { - - } - - @Override - public void clearStatistics(String name) { - - } - - @Override - public RedisCacheWriter withStatisticsCollector(CacheStatisticsCollector cacheStatisticsCollector) { - return null; - } - - @Override - public CacheStatistics getCacheStatistics(String cacheName) { - return null; - } - }, RedisCacheConfiguration.defaultCacheConfig()); - - ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadCount, threadCount, 1, TimeUnit.MINUTES, - new LinkedBlockingDeque<>(), new DefaultThreadFactory("RedisCacheTests")); - - IntStream.range(0, threadCount).forEach(it -> tpe.submit(() -> { - cache.get("foo", retrievals::incrementAndGet); - finished.countDown(); - })); - - // wait until all Threads have arrived in RedisCacheWriter.get(…) - prepare.await(); - - // let all threads continue - prepareForReturn.countDown(); - - // wait until ThreadPoolExecutor has completed. - finished.await(); - tpe.shutdown(); - - assertThat(retrievals).hasValue(1); - } - @EnabledOnCommand("GETEX") @ParameterizedRedisTest // GH-2351 void cacheGetWithTimeToIdleExpirationWhenEntryNotExpiredShouldReturnValue() {