Add caching annotation support for CompletableFuture and reactive return values

Includes CompletableFuture-based retrieve operations on Spring's Cache interface.
Includes support for retrieve operations on CaffeineCache and ConcurrentMapCache.
Includes async cache mode option on CaffeineCacheManager.

Closes gh-17559
Closes gh-17920
Closes gh-30122
This commit is contained in:
Juergen Hoeller
2023-07-21 20:27:23 +02:00
parent d65d285378
commit f99faac073
9 changed files with 952 additions and 145 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,20 +17,28 @@
package org.springframework.cache;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.springframework.lang.Nullable;
/**
* Interface that defines common cache operations.
*
* <b>Note:</b> Due to the generic use of caching, it is recommended that
* implementations allow storage of {@code null} values (for example to
* cache methods that return {@code null}).
* <p>Serves as an SPI for Spring's annotation-based caching model
* ({@link org.springframework.cache.annotation.Cacheable} and co)
* as well as an API for direct usage in applications.
*
* <p><b>Note:</b> Due to the generic use of caching, it is recommended
* that implementations allow storage of {@code null} values
* (for example to cache methods that return {@code null}).
*
* @author Costin Leau
* @author Juergen Hoeller
* @author Stephane Nicoll
* @since 3.1
* @see CacheManager
* @see org.springframework.cache.annotation.Cacheable
*/
public interface Cache {
@@ -100,6 +108,51 @@ public interface Cache {
@Nullable
<T> T get(Object key, Callable<T> valueLoader);
/**
* Return the value to which this cache maps the specified key,
* wrapped in a {@link CompletableFuture}. This operation must not block
* but is allowed to return a completed {@link CompletableFuture} if the
* corresponding value is immediately available.
* <p>Returns {@code null} if the cache contains no mapping for this key;
* otherwise, the cached value (which may be {@code null} itself) will
* be returned in the {@link CompletableFuture}.
* @param key the key whose associated value is to be returned
* @return the value to which this cache maps the specified key,
* contained within a {@link CompletableFuture} which may also hold
* a cached {@code null} value. A straight {@code null} being
* returned means that the cache contains no mapping for this key.
* @since 6.1
* @see #get(Object)
*/
@Nullable
default CompletableFuture<?> retrieve(Object key) {
throw new UnsupportedOperationException(
getClass().getName() + " does not support CompletableFuture-based retrieval");
}
/**
* Return the value to which this cache maps the specified key, obtaining
* that value from {@code valueLoader} if necessary. This method provides
* a simple substitute for the conventional "if cached, return; otherwise
* create, cache and return" pattern, based on {@link CompletableFuture}.
* This operation must not block.
* <p>If possible, implementations should ensure that the loading operation
* is synchronized so that the specified {@code valueLoader} is only called
* once in case of concurrent access on the same key.
* <p>If the {@code valueLoader} throws an exception, it will be propagated
* to the {@code CompletableFuture} handle returned from here.
* @param key the key whose associated value is to be returned
* @return the value to which this cache maps the specified key,
* contained within a {@link CompletableFuture}
* @since 6.1
* @see #retrieve(Object)
* @see #get(Object, Callable)
*/
default <T> CompletableFuture<T> retrieve(Object key, Supplier<CompletableFuture<T>> valueLoader) {
throw new UnsupportedOperationException(
getClass().getName() + " does not support CompletableFuture-based retrieval");
}
/**
* Associate the specified value with the specified key in this cache.
* <p>If the cache previously contained a mapping for this key, the old
@@ -108,6 +161,11 @@ public interface Cache {
* fashion, with subsequent lookups possibly not seeing the entry yet.
* This may for example be the case with transactional cache decorators.
* Use {@link #putIfAbsent} for guaranteed immediate registration.
* <p>If the cache is supposed to be compatible with {@link CompletableFuture}
* and reactive interactions, the put operation needs to be effectively
* non-blocking, with any backend write-through happening asynchronously.
* This goes along with a cache implemented and configured to support
* {@link #retrieve(Object)} and {@link #retrieve(Object, Supplier)}.
* @param key the key with which the specified value is to be associated
* @param value the value to be associated with the specified key
* @see #putIfAbsent(Object, Object)
@@ -156,6 +214,11 @@ public interface Cache {
* fashion, with subsequent lookups possibly still seeing the entry.
* This may for example be the case with transactional cache decorators.
* Use {@link #evictIfPresent} for guaranteed immediate removal.
* <p>If the cache is supposed to be compatible with {@link CompletableFuture}
* and reactive interactions, the evict operation needs to be effectively
* non-blocking, with any backend write-through happening asynchronously.
* This goes along with a cache implemented and configured to support
* {@link #retrieve(Object)} and {@link #retrieve(Object, Supplier)}.
* @param key the key whose mapping is to be removed from the cache
* @see #evictIfPresent(Object)
*/
@@ -188,6 +251,11 @@ public interface Cache {
* fashion, with subsequent lookups possibly still seeing the entries.
* This may for example be the case with transactional cache decorators.
* Use {@link #invalidate()} for guaranteed immediate removal of entries.
* <p>If the cache is supposed to be compatible with {@link CompletableFuture}
* and reactive interactions, the clear operation needs to be effectively
* non-blocking, with any backend write-through happening asynchronously.
* This goes along with a cache implemented and configured to support
* {@link #retrieve(Object)} and {@link #retrieve(Object, Supplier)}.
* @see #invalidate()
*/
void clear();

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,8 +17,11 @@
package org.springframework.cache.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import org.springframework.cache.support.AbstractValueAdaptingCache;
import org.springframework.core.serializer.support.SerializationDelegate;
@@ -26,13 +29,17 @@ import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Simple {@link org.springframework.cache.Cache} implementation based on the
* core JDK {@code java.util.concurrent} package.
* Simple {@link org.springframework.cache.Cache} implementation based on the core
* JDK {@code java.util.concurrent} package.
*
* <p>Useful for testing or simple caching scenarios, typically in combination
* with {@link org.springframework.cache.support.SimpleCacheManager} or
* dynamically through {@link ConcurrentMapCacheManager}.
*
* <p>Supports the {@link #retrieve(Object)} and {@link #retrieve(Object, Supplier)}
* operations in a best-effort fashion, relying on default {@link CompletableFuture}
* execution (typically within the JVM's {@link ForkJoinPool#commonPool()}).
*
* <p><b>Note:</b> As {@link ConcurrentHashMap} (the default implementation used)
* does not allow for {@code null} values to be stored, this class will replace
* them with a predefined internal object. This behavior can be changed through the
@@ -149,6 +156,20 @@ public class ConcurrentMapCache extends AbstractValueAdaptingCache {
}));
}
@Override
@Nullable
public CompletableFuture<?> retrieve(Object key) {
Object value = lookup(key);
return (value != null ? CompletableFuture.completedFuture(fromStoreValue(value)) : null);
}
@SuppressWarnings("unchecked")
@Override
public <T> CompletableFuture<T> retrieve(Object key, Supplier<CompletableFuture<T>> valueLoader) {
return CompletableFuture.supplyAsync(() ->
(T) fromStoreValue(this.store.computeIfAbsent(key, k -> toStoreValue(valueLoader.get().join()))));
}
@Override
public void put(Object key, @Nullable Object value) {
this.store.put(key, toStoreValue(value));

View File

@@ -24,11 +24,16 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.support.AopUtils;
@@ -43,6 +48,8 @@ import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.context.expression.AnnotatedElementKey;
import org.springframework.core.BridgeMethodResolver;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.expression.EvaluationContext;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@@ -83,12 +90,18 @@ import org.springframework.util.function.SupplierUtils;
public abstract class CacheAspectSupport extends AbstractCacheInvoker
implements BeanFactoryAware, InitializingBean, SmartInitializingSingleton {
private static final boolean reactiveStreamsPresent = ClassUtils.isPresent(
"org.reactivestreams.Publisher", CacheAspectSupport.class.getClassLoader());
protected final Log logger = LogFactory.getLog(getClass());
private final Map<CacheOperationCacheKey, CacheOperationMetadata> metadataCache = new ConcurrentHashMap<>(1024);
private final CacheOperationExpressionEvaluator evaluator = new CacheOperationExpressionEvaluator();
@Nullable
private final ReactiveCachingHandler reactiveCachingHandler;
@Nullable
private CacheOperationSource cacheOperationSource;
@@ -103,6 +116,11 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
private boolean initialized = false;
protected CacheAspectSupport() {
this.reactiveCachingHandler = (reactiveStreamsPresent ? new ReactiveCachingHandler() : null);
}
/**
* Configure this aspect with the given error handler, key generator and cache resolver/manager
* suppliers, applying the corresponding default if a supplier is not resolvable.
@@ -371,41 +389,25 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
}
@Nullable
private Object execute(final CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) {
// Special handling of synchronized invocation
private Object execute(CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) {
if (contexts.isSynchronized()) {
CacheOperationContext context = contexts.get(CacheableOperation.class).iterator().next();
if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) {
Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT);
Cache cache = context.getCaches().iterator().next();
try {
return wrapCacheValue(method, handleSynchronizedGet(invoker, key, cache));
}
catch (Cache.ValueRetrievalException ex) {
// Directly propagate ThrowableWrapper from the invoker,
// or potentially also an IllegalArgumentException etc.
ReflectionUtils.rethrowRuntimeException(ex.getCause());
}
}
else {
// No caching required, just call the underlying method
return invokeOperation(invoker);
}
// Special handling of synchronized invocation
return executeSynchronized(invoker, method, contexts);
}
// Process any early evictions
processCacheEvicts(contexts.get(CacheEvictOperation.class), true,
CacheOperationExpressionEvaluator.NO_RESULT);
// Check if we have a cached item matching the conditions
Cache.ValueWrapper cacheHit = findCachedItem(contexts.get(CacheableOperation.class));
// Check if we have a cached value matching the conditions
Object cacheHit = findCachedValue(contexts.get(CacheableOperation.class));
Object cacheValue;
Object returnValue;
if (cacheHit != null && !hasCachePut(contexts)) {
// If there are no put requests, just use the cache hit
cacheValue = cacheHit.get();
cacheValue = (cacheHit instanceof Cache.ValueWrapper wrapper ? wrapper.get() : cacheHit);
returnValue = wrapCacheValue(method, cacheValue);
}
else {
@@ -414,8 +416,8 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
cacheValue = unwrapReturnValue(returnValue);
}
// Collect puts from any @Cacheable miss, if no cached item is found
List<CachePutRequest> cachePutRequests = new ArrayList<>();
// Collect puts from any @Cacheable miss, if no cached value is found
List<CachePutRequest> cachePutRequests = new ArrayList<>(1);
if (cacheHit == null) {
collectPutRequests(contexts.get(CacheableOperation.class), cacheValue, cachePutRequests);
}
@@ -425,29 +427,52 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
// Process any collected put requests, either from @CachePut or a @Cacheable miss
for (CachePutRequest cachePutRequest : cachePutRequests) {
cachePutRequest.apply(cacheValue);
Object returnOverride = cachePutRequest.apply(cacheValue);
if (returnOverride != null) {
returnValue = returnOverride;
}
}
// Process any late evictions
processCacheEvicts(contexts.get(CacheEvictOperation.class), false, cacheValue);
Object returnOverride = processCacheEvicts(
contexts.get(CacheEvictOperation.class), false, returnValue);
if (returnOverride != null) {
returnValue = returnOverride;
}
return returnValue;
}
@Nullable
private Object handleSynchronizedGet(CacheOperationInvoker invoker, Object key, Cache cache) {
InvocationAwareResult invocationResult = new InvocationAwareResult();
Object result = cache.get(key, () -> {
invocationResult.invoked = true;
if (logger.isTraceEnabled()) {
logger.trace("No cache entry for key '" + key + "' in cache " + cache.getName());
private Object executeSynchronized(CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) {
CacheOperationContext context = contexts.get(CacheableOperation.class).iterator().next();
if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) {
Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT);
Cache cache = context.getCaches().iterator().next();
if (CompletableFuture.class.isAssignableFrom(method.getReturnType())) {
return cache.retrieve(key, () -> (CompletableFuture<?>) invokeOperation(invoker));
}
if (this.reactiveCachingHandler != null) {
Object returnValue = this.reactiveCachingHandler.executeSynchronized(invoker, method, cache, key);
if (returnValue != ReactiveCachingHandler.NOT_HANDLED) {
return returnValue;
}
}
try {
return wrapCacheValue(method, cache.get(key, () -> unwrapReturnValue(invokeOperation(invoker))));
}
catch (Cache.ValueRetrievalException ex) {
// Directly propagate ThrowableWrapper from the invoker,
// or potentially also an IllegalArgumentException etc.
ReflectionUtils.rethrowRuntimeException(ex.getCause());
// Never reached
return null;
}
return unwrapReturnValue(invokeOperation(invoker));
});
if (!invocationResult.invoked && logger.isTraceEnabled()) {
logger.trace("Cache entry for key '" + key + "' found in cache '" + cache.getName() + "'");
}
return result;
else {
// No caching required, just call the underlying method
return invokeOperation(invoker);
}
}
@Nullable
@@ -467,7 +492,7 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
private boolean hasCachePut(CacheOperationContexts contexts) {
// Evaluate the conditions *without* the result object because we don't have it yet...
Collection<CacheOperationContext> cachePutContexts = contexts.get(CachePutOperation.class);
Collection<CacheOperationContext> excluded = new ArrayList<>();
Collection<CacheOperationContext> excluded = new ArrayList<>(1);
for (CacheOperationContext context : cachePutContexts) {
try {
if (!context.isConditionPassing(CacheOperationExpressionEvaluator.RESULT_UNAVAILABLE)) {
@@ -482,32 +507,55 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
return (cachePutContexts.size() != excluded.size());
}
private void processCacheEvicts(
Collection<CacheOperationContext> contexts, boolean beforeInvocation, @Nullable Object result) {
@Nullable
private Object processCacheEvicts(Collection<CacheOperationContext> contexts, boolean beforeInvocation,
@Nullable Object result) {
for (CacheOperationContext context : contexts) {
CacheEvictOperation operation = (CacheEvictOperation) context.metadata.operation;
if (beforeInvocation == operation.isBeforeInvocation() && isConditionPassing(context, result)) {
performCacheEvict(context, operation, result);
if (contexts.isEmpty()) {
return null;
}
List<CacheOperationContext> applicable = contexts.stream()
.filter(context -> (context.metadata.operation instanceof CacheEvictOperation evict &&
beforeInvocation == evict.isBeforeInvocation())).toList();
if (applicable.isEmpty()) {
return null;
}
if (result instanceof CompletableFuture<?> future) {
return future.whenComplete((value, ex) -> {
if (ex == null) {
performCacheEvicts(applicable, result);
}
});
}
if (this.reactiveCachingHandler != null) {
Object returnValue = this.reactiveCachingHandler.processCacheEvicts(applicable, result);
if (returnValue != ReactiveCachingHandler.NOT_HANDLED) {
return returnValue;
}
}
performCacheEvicts(applicable, result);
return null;
}
private void performCacheEvict(
CacheOperationContext context, CacheEvictOperation operation, @Nullable Object result) {
Object key = null;
for (Cache cache : context.getCaches()) {
if (operation.isCacheWide()) {
logInvalidating(context, operation, null);
doClear(cache, operation.isBeforeInvocation());
}
else {
if (key == null) {
key = generateKey(context, result);
private void performCacheEvicts(List<CacheOperationContext> contexts, @Nullable Object result) {
for (CacheOperationContext context : contexts) {
CacheEvictOperation operation = (CacheEvictOperation) context.metadata.operation;
if (isConditionPassing(context, result)) {
Object key = null;
for (Cache cache : context.getCaches()) {
if (operation.isCacheWide()) {
logInvalidating(context, operation, null);
doClear(cache, operation.isBeforeInvocation());
}
else {
if (key == null) {
key = generateKey(context, result);
}
logInvalidating(context, operation, key);
doEvict(cache, key, operation.isBeforeInvocation());
}
}
logInvalidating(context, operation, key);
doEvict(cache, key, operation.isBeforeInvocation());
}
}
}
@@ -520,19 +568,21 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
}
/**
* Find a cached item only for {@link CacheableOperation} that passes the condition.
* Find a cached value only for {@link CacheableOperation} that passes the condition.
* @param contexts the cacheable operations
* @return a {@link Cache.ValueWrapper} holding the cached item,
* @return a {@link Cache.ValueWrapper} holding the cached value,
* or {@code null} if none is found
*/
@Nullable
private Cache.ValueWrapper findCachedItem(Collection<CacheOperationContext> contexts) {
Object result = CacheOperationExpressionEvaluator.NO_RESULT;
private Object findCachedValue(Collection<CacheOperationContext> contexts) {
for (CacheOperationContext context : contexts) {
if (isConditionPassing(context, result)) {
Object key = generateKey(context, result);
Cache.ValueWrapper cached = findInCaches(context, key);
if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) {
Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT);
Object cached = findInCaches(context, key);
if (cached != null) {
if (logger.isTraceEnabled()) {
logger.trace("Cache entry for key '" + key + "' found in cache(s) " + context.getCacheNames());
}
return cached;
}
else {
@@ -547,9 +597,9 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
/**
* Collect the {@link CachePutRequest} for all {@link CacheOperation} using
* the specified result item.
* the specified result value.
* @param contexts the contexts to handle
* @param result the result item (never {@code null})
* @param result the result value (never {@code null})
* @param putRequests the collection to update
*/
private void collectPutRequests(Collection<CacheOperationContext> contexts,
@@ -564,15 +614,18 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
}
@Nullable
private Cache.ValueWrapper findInCaches(CacheOperationContext context, Object key) {
private Object findInCaches(CacheOperationContext context, Object key) {
for (Cache cache : context.getCaches()) {
Cache.ValueWrapper wrapper = doGet(cache, key);
if (wrapper != null) {
if (logger.isTraceEnabled()) {
logger.trace("Cache entry for key '" + key + "' found in cache '" + cache.getName() + "'");
}
return wrapper;
if (CompletableFuture.class.isAssignableFrom(context.getMethod().getReturnType())) {
return cache.retrieve(key);
}
if (this.reactiveCachingHandler != null) {
Object returnValue = this.reactiveCachingHandler.findInCaches(context, cache, key);
if (returnValue != ReactiveCachingHandler.NOT_HANDLED) {
return returnValue;
}
}
return doGet(cache, key);
}
return null;
}
@@ -625,13 +678,13 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
}
private boolean determineSyncFlag(Method method) {
List<CacheOperationContext> cacheOperationContexts = this.contexts.get(CacheableOperation.class);
if (cacheOperationContexts == null) { // no @Cacheable operation at all
List<CacheOperationContext> cacheableContexts = this.contexts.get(CacheableOperation.class);
if (cacheableContexts == null) { // no @Cacheable operation at all
return false;
}
boolean syncEnabled = false;
for (CacheOperationContext cacheOperationContext : cacheOperationContexts) {
if (((CacheableOperation) cacheOperationContext.getOperation()).isSync()) {
for (CacheOperationContext context : cacheableContexts) {
if (context.getOperation() instanceof CacheableOperation cacheable && cacheable.isSync()) {
syncEnabled = true;
break;
}
@@ -641,13 +694,13 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
throw new IllegalStateException(
"A sync=true operation cannot be combined with other cache operations on '" + method + "'");
}
if (cacheOperationContexts.size() > 1) {
if (cacheableContexts.size() > 1) {
throw new IllegalStateException(
"Only one sync=true operation is allowed on '" + method + "'");
}
CacheOperationContext cacheOperationContext = cacheOperationContexts.iterator().next();
CacheOperation operation = cacheOperationContext.getOperation();
if (cacheOperationContext.getCaches().size() > 1) {
CacheOperationContext cacheableContext = cacheableContexts.iterator().next();
CacheOperation operation = cacheableContext.getOperation();
if (cacheableContext.getCaches().size() > 1) {
throw new IllegalStateException(
"A sync=true operation is restricted to a single cache on '" + operation + "'");
}
@@ -720,7 +773,7 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
this.args = extractArgs(metadata.method, args);
this.target = target;
this.caches = CacheAspectSupport.this.getCaches(this, metadata.cacheResolver);
this.cacheNames = createCacheNames(this.caches);
this.cacheNames = prepareCacheNames(this.caches);
}
@Override
@@ -808,8 +861,8 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
return this.cacheNames;
}
private Collection<String> createCacheNames(Collection<? extends Cache> caches) {
Collection<String> names = new ArrayList<>();
private Collection<String> prepareCacheNames(Collection<? extends Cache> caches) {
Collection<String> names = new ArrayList<>(caches.size());
for (Cache cache : caches) {
names.add(cache.getName());
}
@@ -818,25 +871,6 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
}
private class CachePutRequest {
private final CacheOperationContext context;
private final Object key;
public CachePutRequest(CacheOperationContext context, Object key) {
this.context = context;
this.key = key;
}
public void apply(@Nullable Object result) {
for (Cache cache : this.context.getCaches()) {
doPut(cache, this.key, result);
}
}
}
private static final class CacheOperationCacheKey implements Comparable<CacheOperationCacheKey> {
private final CacheOperation cacheOperation;
@@ -876,12 +910,168 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
}
/**
* Internal holder class for recording that a cache method was invoked.
*/
private static class InvocationAwareResult {
private class CachePutRequest {
boolean invoked;
private final CacheOperationContext context;
private final Object key;
public CachePutRequest(CacheOperationContext context, Object key) {
this.context = context;
this.key = key;
}
@Nullable
public Object apply(@Nullable Object result) {
if (result instanceof CompletableFuture<?> future) {
return future.whenComplete((value, ex) -> {
if (ex != null) {
performEvict(ex);
}
else {
performPut(value);
}
});
}
if (reactiveCachingHandler != null) {
Object returnValue = reactiveCachingHandler.processPutRequest(this, result);
if (returnValue != ReactiveCachingHandler.NOT_HANDLED) {
return returnValue;
}
}
performPut(result);
return null;
}
void performPut(@Nullable Object value) {
if (logger.isTraceEnabled()) {
logger.trace("Creating cache entry for key '" + this.key + "' in cache(s) " +
this.context.getCacheNames());
}
for (Cache cache : this.context.getCaches()) {
doPut(cache, this.key, value);
}
}
void performEvict(Throwable cause) {
if (logger.isTraceEnabled()) {
logger.trace("Removing cache entry for key '" + this.key + "' from cache(s) " +
this.context.getCacheNames() + " due to exception: " + cause);
}
for (Cache cache : this.context.getCaches()) {
doEvict(cache, this.key, false);
}
}
}
/**
* Reactive Streams Subscriber collection for collecting a List to cache.
*/
private class CachePutListSubscriber implements Subscriber<Object> {
private final CachePutRequest request;
private final List<Object> cacheValue = new ArrayList<>();
public CachePutListSubscriber(CachePutRequest request) {
this.request = request;
}
@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Object o) {
this.cacheValue.add(o);
}
@Override
public void onError(Throwable t) {
this.request.performEvict(t);
}
@Override
public void onComplete() {
this.request.performPut(this.cacheValue);
}
}
/**
* Inner class to avoid a hard dependency on the Reactive Streams API at runtime.
*/
private class ReactiveCachingHandler {
public static final Object NOT_HANDLED = new Object();
private final ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
@Nullable
public Object executeSynchronized(CacheOperationInvoker invoker, Method method, Cache cache, Object key) {
ReactiveAdapter adapter = this.registry.getAdapter(method.getReturnType());
if (adapter != null) {
if (adapter.isMultiValue()) {
// Flux or similar
return adapter.fromPublisher(Flux.from(Mono.fromFuture(
cache.retrieve(key,
() -> Flux.from(adapter.toPublisher(invokeOperation(invoker))).collectList().toFuture())))
.flatMap(Flux::fromIterable));
}
else {
// Mono or similar
return adapter.fromPublisher(Mono.fromFuture(
cache.retrieve(key,
() -> Mono.from(adapter.toPublisher(invokeOperation(invoker))).toFuture())));
}
}
return NOT_HANDLED;
}
@Nullable
public Object processCacheEvicts(List<CacheOperationContext> contexts, @Nullable Object result) {
ReactiveAdapter adapter = (result != null ? this.registry.getAdapter(result.getClass()) : null);
if (adapter != null) {
return adapter.fromPublisher(Mono.from(adapter.toPublisher(result))
.doOnSuccess(value -> performCacheEvicts(contexts, result)));
}
return NOT_HANDLED;
}
@Nullable
public Object findInCaches(CacheOperationContext context, Cache cache, Object key) {
ReactiveAdapter adapter = this.registry.getAdapter(context.getMethod().getReturnType());
if (adapter != null) {
CompletableFuture<?> cachedFuture = cache.retrieve(key);
if (cachedFuture == null) {
return null;
}
if (adapter.isMultiValue()) {
return adapter.fromPublisher(Flux.from(Mono.fromFuture(cachedFuture))
.flatMap(v -> (v instanceof Iterable<?> iv ? Flux.fromIterable(iv) : Flux.just(v))));
}
else {
return adapter.fromPublisher(Mono.fromFuture(cachedFuture));
}
}
return NOT_HANDLED;
}
@Nullable
public Object processPutRequest(CachePutRequest request, @Nullable Object result) {
ReactiveAdapter adapter = (result != null ? this.registry.getAdapter(result.getClass()) : null);
if (adapter != null) {
if (adapter.isMultiValue()) {
Flux<?> source = Flux.from(adapter.toPublisher(result));
source.subscribe(new CachePutListSubscriber(request));
return adapter.fromPublisher(source);
}
else {
return adapter.fromPublisher(Mono.from(adapter.toPublisher(result))
.doOnSuccess(request::performPut).doOnError(request::performEvict));
}
}
return NOT_HANDLED;
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,8 @@
package org.springframework.cache.support;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.springframework.cache.Cache;
import org.springframework.lang.Nullable;
@@ -80,6 +82,17 @@ public class NoOpCache implements Cache {
}
}
@Override
@Nullable
public CompletableFuture<?> retrieve(Object key) {
return null;
}
@Override
public <T> CompletableFuture<T> retrieve(Object key, Supplier<CompletableFuture<T>> valueLoader) {
return valueLoader.get();
}
@Override
public void put(Object key, @Nullable Object value) {
}