Use Redis locking for value retrieval synchronization.

We now use RedisCacheWriter to acquire and maintain the lock for value retrieval synchronization.

Closes: #2890
Original Pull Request: #2948
This commit is contained in:
Mark Paluch
2024-07-29 09:23:37 +02:00
committed by Christoph Strobl
parent 777f079114
commit 06bbd3dcab
5 changed files with 169 additions and 160 deletions

View File

@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import org.springframework.dao.PessimisticLockingFailureException;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
@@ -137,9 +138,14 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
Assert.notNull(name, "Name must not be null");
Assert.notNull(key, "Key must not be null");
byte[] result = shouldExpireWithin(ttl)
? execute(name, connection -> connection.stringCommands().getEx(key, Expiration.from(ttl)))
: execute(name, connection -> connection.stringCommands().get(key));
return execute(name, connection -> doGet(connection, name, key, ttl));
}
@Nullable
private byte[] doGet(RedisConnection connection, String name, byte[] key, @Nullable Duration ttl) {
byte[] result = shouldExpireWithin(ttl) ? connection.stringCommands().getEx(key, Expiration.from(ttl))
: connection.stringCommands().get(key);
statistics.incGets(name);
@@ -152,6 +158,50 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
return result;
}
@Override
public byte[] get(String name, byte[] key, Supplier<byte[]> valueLoader, @Nullable Duration ttl,
boolean timeToIdleEnabled) {
Assert.notNull(name, "Name must not be null");
Assert.notNull(key, "Key must not be null");
boolean withTtl = shouldExpireWithin(ttl);
// double-checked locking optimization
if (isLockingCacheWriter()) {
byte[] bytes = get(name, key, timeToIdleEnabled && withTtl ? ttl : null);
if (bytes != null) {
return bytes;
}
}
return execute(name, connection -> {
boolean wasLocked = false;
if (isLockingCacheWriter()) {
doLock(name, key, null, connection);
wasLocked = true;
}
try {
byte[] result = doGet(connection, name, key, timeToIdleEnabled && withTtl ? ttl : null);
if (result != null) {
return result;
}
byte[] value = valueLoader.get();
doPut(connection, name, key, value, ttl);
return value;
} finally {
if (isLockingCacheWriter() && wasLocked) {
doUnlock(name, connection);
}
}
});
}
@Override
public boolean supportsAsyncRetrieve() {
return asyncCacheWriter.isSupported();
@@ -186,17 +236,21 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
Assert.notNull(value, "Value must not be null");
execute(name, connection -> {
if (shouldExpireWithin(ttl)) {
connection.stringCommands().set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS),
SetOption.upsert());
} else {
connection.stringCommands().set(key, value);
}
doPut(connection, name, key, value, ttl);
return "OK";
});
}
private void doPut(RedisConnection connection, String name, byte[] key, byte[] value, @Nullable Duration ttl) {
if (shouldExpireWithin(ttl)) {
connection.stringCommands().set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS),
SetOption.upsert());
} else {
connection.stringCommands().set(key, value);
}
statistics.incPuts(name);
}

View File

@@ -25,8 +25,6 @@ import java.util.Map.Entry;
import java.util.StringJoiner;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.springframework.cache.Cache;
@@ -64,8 +62,6 @@ public class RedisCache extends AbstractValueAdaptingCache {
static final String CACHE_RETRIEVAL_UNSUPPORTED_OPERATION_EXCEPTION_MESSAGE = "The Redis driver configured with RedisCache through RedisCacheWriter does not support CompletableFuture-based retrieval";
private final Lock lock = new ReentrantLock();
private final RedisCacheConfiguration cacheConfiguration;
private final RedisCacheWriter cacheWriter;
@@ -154,28 +150,18 @@ public class RedisCache extends AbstractValueAdaptingCache {
@SuppressWarnings("unchecked")
public <T> T get(Object key, Callable<T> valueLoader) {
ValueWrapper result = get(key);
byte[] binaryKey = createAndConvertCacheKey(key);
byte[] binaryValue = getCacheWriter().get(getName(), binaryKey,
() -> serializeCacheValue(toStoreValue(loadCacheValue(key, valueLoader))), getTimeToLive(key),
getCacheConfiguration().isTimeToIdleEnabled());
return result != null ? (T) result.get() : getSynchronized(key, valueLoader);
}
ValueWrapper result = toValueWrapper(deserializeCacheValue(binaryValue));
@Nullable
@SuppressWarnings("unchecked")
private <T> T getSynchronized(Object key, Callable<T> valueLoader) {
lock.lock();
try {
ValueWrapper result = get(key);
return result != null ? (T) result.get() : loadCacheValue(key, valueLoader);
} finally {
lock.unlock();
}
return result != null ? (T) result.get() : null;
}
/**
* Loads the {@link Object} using the given {@link Callable valueLoader} and {@link #put(Object, Object) puts} the
* {@link Object loaded value} in the cache.
* Loads the {@link Object} using the given {@link Callable valueLoader}.
*
* @param <T> {@link Class type} of the loaded {@link Object cache value}.
* @param key {@link Object key} mapped to the loaded {@link Object cache value}.
@@ -184,17 +170,11 @@ public class RedisCache extends AbstractValueAdaptingCache {
*/
protected <T> T loadCacheValue(Object key, Callable<T> valueLoader) {
T value;
try {
value = valueLoader.call();
return valueLoader.call();
} catch (Exception ex) {
throw new ValueRetrievalException(key, valueLoader, ex);
}
put(key, value);
return value;
}
@Override

View File

@@ -24,14 +24,14 @@ import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* {@link RedisCacheWriter} provides low-level access to Redis commands ({@code SET, SETNX, GET, EXPIRE,...})
* used for caching.
* {@link RedisCacheWriter} provides low-level access to Redis commands ({@code SET, SETNX, GET, EXPIRE,...}) used for
* caching.
* <p>
* The {@link RedisCacheWriter} may be shared by multiple cache implementations and is responsible for reading/writing
* binary data from/to Redis. The implementation honors potential cache lock flags that might be set.
* <p>
* The default {@link RedisCacheWriter} implementation can be customized with {@link BatchStrategy}
* to tune performance behavior.
* The default {@link RedisCacheWriter} implementation can be customized with {@link BatchStrategy} to tune performance
* behavior.
*
* @author Christoph Strobl
* @author Mark Paluch
@@ -96,9 +96,8 @@ public interface RedisCacheWriter extends CacheStatisticsProvider {
*
* @param connectionFactory must not be {@literal null}.
* @param sleepTime sleep time between lock access attempts, must not be {@literal null}.
* @param lockTtlFunction TTL function to compute the Lock TTL. The function is called with contextual keys
* and values (such as the cache name on cleanup or the actual key/value on put requests);
* must not be {@literal null}.
* @param lockTtlFunction TTL function to compute the Lock TTL. The function is called with contextual keys and values
* (such as the cache name on cleanup or the actual key/value on put requests); must not be {@literal null}.
* @param batchStrategy must not be {@literal null}.
* @return new instance of {@link DefaultRedisCacheWriter}.
* @since 3.2
@@ -124,8 +123,8 @@ public interface RedisCacheWriter extends CacheStatisticsProvider {
byte[] get(String name, byte[] key);
/**
* Get the binary value representation from Redis stored for the given key and set
* the given {@link Duration TTL expiration} for the cache entry.
* Get the binary value representation from Redis stored for the given key and set the given {@link Duration TTL
* expiration} for the cache entry.
*
* @param name must not be {@literal null}.
* @param key must not be {@literal null}.
@@ -138,14 +137,41 @@ public interface RedisCacheWriter extends CacheStatisticsProvider {
}
/**
* Determines whether the asynchronous {@link #retrieve(String, byte[])}
* and {@link #retrieve(String, byte[], Duration)} cache operations are supported by the implementation.
* Get the binary value representation from Redis stored for the given key and set the given {@link Duration TTL
* expiration} for the cache entry, obtaining the value from {@code valueLoader} if necessary.
* <p>
* The main factor for whether the {@literal retrieve} operation can be supported will primarily be determined
* by the Redis driver in use at runtime.
* If possible (and configured for locking), implementations should ensure that the loading operation is synchronized
* so that the specified {@code valueLoader} is only called once in case of concurrent access on the same key.
*
* @param name must not be {@literal null}.
* @param key must not be {@literal null}.
* @param valueLoader value loader that creates the value if the cache lookup has been not successful.
* @param ttl {@link Duration} specifying the {@literal expiration timeout} for the cache entry.
* @param timeToIdleEnabled {@literal true} to enable Time to Idle when retrieving the value.
* @since 3.4
*/
default byte[] get(String name, byte[] key, Supplier<byte[]> valueLoader, @Nullable Duration ttl,
boolean timeToIdleEnabled) {
byte[] bytes = timeToIdleEnabled ? get(name, key, ttl) : get(name, key);
if (bytes == null) {
bytes = valueLoader.get();
put(name, key, bytes, ttl);
}
return bytes;
}
/**
* Determines whether the asynchronous {@link #retrieve(String, byte[])} and
* {@link #retrieve(String, byte[], Duration)} cache operations are supported by the implementation.
* <p>
* Returns {@literal false} by default. This will have an effect of {@link RedisCache#retrieve(Object)}
* and {@link RedisCache#retrieve(Object, Supplier)} throwing an {@link UnsupportedOperationException}.
* The main factor for whether the {@literal retrieve} operation can be supported will primarily be determined by the
* Redis driver in use at runtime.
* <p>
* Returns {@literal false} by default. This will have an effect of {@link RedisCache#retrieve(Object)} and
* {@link RedisCache#retrieve(Object, Supplier)} throwing an {@link UnsupportedOperationException}.
*
* @return {@literal true} if asynchronous {@literal retrieve} operations are supported by the implementation.
* @since 3.2
@@ -155,8 +181,8 @@ public interface RedisCacheWriter extends CacheStatisticsProvider {
}
/**
* Asynchronously retrieves the {@link CompletableFuture value} to which the {@link RedisCache}
* maps the given {@link byte[] key}.
* Asynchronously retrieves the {@link CompletableFuture value} to which the {@link RedisCache} maps the given
* {@link byte[] key}.
* <p>
* This operation is non-blocking.
*
@@ -171,8 +197,8 @@ public interface RedisCacheWriter extends CacheStatisticsProvider {
}
/**
* Asynchronously retrieves the {@link CompletableFuture value} to which the {@link RedisCache} maps
* the given {@link byte[] key} setting the {@link Duration TTL expiration} for the cache entry.
* Asynchronously retrieves the {@link CompletableFuture value} to which the {@link RedisCache} maps the given
* {@link byte[] key} setting the {@link Duration TTL expiration} for the cache entry.
* <p>
* This operation is non-blocking.
*
@@ -264,8 +290,8 @@ public interface RedisCacheWriter extends CacheStatisticsProvider {
/**
* Creates a {@literal Singleton} {@link TtlFunction} using the given {@link Duration}.
*
* @param duration the time to live. Can be {@link Duration#ZERO} for persistent values (i.e. cache entry
* does not expire).
* @param duration the time to live. Can be {@link Duration#ZERO} for persistent values (i.e. cache entry does not
* expire).
* @return a singleton {@link TtlFunction} using {@link Duration}.
*/
static TtlFunction just(Duration duration) {

View File

@@ -242,6 +242,22 @@ public class DefaultRedisCacheWriterTests {
assertThat(writer.getCacheStatistics(CACHE_NAME).getPuts()).isOne();
}
@ParameterizedRedisTest // GH-2890
void getWithValueLoaderShouldStoreCacheValue() {
RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory)
.withStatisticsCollector(CacheStatisticsCollector.create());
writer.get(CACHE_NAME, binaryCacheKey, () -> binaryCacheValue, Duration.ofSeconds(5), true);
doWithConnection(connection -> {
assertThat(connection.ttl(binaryCacheKey)).isGreaterThan(3).isLessThan(6);
});
assertThat(writer.getCacheStatistics(CACHE_NAME).getMisses()).isOne();
assertThat(writer.getCacheStatistics(CACHE_NAME).getPuts()).isOne();
}
@ParameterizedRedisTest // DATAREDIS-481, DATAREDIS-1082
void removeShouldDeleteEntry() {

View File

@@ -35,6 +35,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -251,6 +252,37 @@ public class RedisCacheTests {
assertThat(result.get()).isEqualTo(null);
}
@ParameterizedRedisTest // GH-2890
void getWithValueLoaderShouldStoreNull() {
doWithConnection(connection -> connection.set(binaryCacheKey, binaryNullValue));
Object result = cache.get(key, () -> {
throw new IllegalStateException();
});
assertThat(result).isNull();
}
@ParameterizedRedisTest // GH-2890
void getWithValueLoaderShouldRetrieveValue() {
AtomicLong counter = new AtomicLong();
Object result = cache.get(key, () -> {
counter.incrementAndGet();
return sample;
});
assertThat(result).isEqualTo(sample);
result = cache.get(key, () -> {
counter.incrementAndGet();
return sample;
});
assertThat(result).isEqualTo(sample);
assertThat(counter).hasValue(1);
}
@ParameterizedRedisTest // DATAREDIS-481
void evictShouldRemoveKey() {
@@ -358,7 +390,7 @@ public class RedisCacheTests {
doWithConnection(connection -> assertThat(
connection.stringCommands().get("redis::cache::key-1".getBytes(StandardCharsets.UTF_8)))
.isEqualTo(binarySample));
.isEqualTo(binarySample));
}
@ParameterizedRedisTest // DATAREDIS-715
@@ -435,105 +467,6 @@ public class RedisCacheTests {
assertThatIllegalStateException().isThrownBy(() -> cache.put(key, sample));
}
@ParameterizedRedisTest // GH-2079
void multipleThreadsLoadValueOnce() throws InterruptedException {
int threadCount = 2;
CountDownLatch prepare = new CountDownLatch(threadCount);
CountDownLatch prepareForReturn = new CountDownLatch(1);
CountDownLatch finished = new CountDownLatch(threadCount);
AtomicInteger retrievals = new AtomicInteger();
AtomicReference<byte[]> storage = new AtomicReference<>();
cache = new RedisCache("foo", new RedisCacheWriter() {
@Override
public byte[] get(String name, byte[] key) {
return get(name, key, null);
}
@Override
public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
prepare.countDown();
try {
prepareForReturn.await(1, TimeUnit.MINUTES);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
return storage.get();
}
@Override
public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Duration ttl) {
byte[] value = get(name, key);
return CompletableFuture.completedFuture(value);
}
@Override
public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
return null;
}
@Override
public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
storage.set(value);
}
@Override
public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
return new byte[0];
}
@Override
public void remove(String name, byte[] key) {
}
@Override
public void clean(String name, byte[] pattern) {
}
@Override
public void clearStatistics(String name) {
}
@Override
public RedisCacheWriter withStatisticsCollector(CacheStatisticsCollector cacheStatisticsCollector) {
return null;
}
@Override
public CacheStatistics getCacheStatistics(String cacheName) {
return null;
}
}, RedisCacheConfiguration.defaultCacheConfig());
ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadCount, threadCount, 1, TimeUnit.MINUTES,
new LinkedBlockingDeque<>(), new DefaultThreadFactory("RedisCacheTests"));
IntStream.range(0, threadCount).forEach(it -> tpe.submit(() -> {
cache.get("foo", retrievals::incrementAndGet);
finished.countDown();
}));
// wait until all Threads have arrived in RedisCacheWriter.get(…)
prepare.await();
// let all threads continue
prepareForReturn.countDown();
// wait until ThreadPoolExecutor has completed.
finished.await();
tpe.shutdown();
assertThat(retrievals).hasValue(1);
}
@EnabledOnCommand("GETEX")
@ParameterizedRedisTest // GH-2351
void cacheGetWithTimeToIdleExpirationWhenEntryNotExpiredShouldReturnValue() {