Respect cache hit when empty Mono/Flux response is returned
Closes gh-31868
This commit is contained in:
@@ -503,6 +503,11 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
|
||||
private Object evaluate(@Nullable Object cacheHit, CacheOperationInvoker invoker, Method method,
|
||||
CacheOperationContexts contexts) {
|
||||
|
||||
// Re-invocation in reactive pipeline after late cache hit determination?
|
||||
if (contexts.processed) {
|
||||
return cacheHit;
|
||||
}
|
||||
|
||||
Object cacheValue;
|
||||
Object returnValue;
|
||||
|
||||
@@ -541,6 +546,9 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
|
||||
returnValue = returnOverride;
|
||||
}
|
||||
|
||||
// Mark as processed for re-invocation after late cache hit determination
|
||||
contexts.processed = true;
|
||||
|
||||
return returnValue;
|
||||
}
|
||||
|
||||
@@ -688,6 +696,8 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
|
||||
|
||||
private final boolean sync;
|
||||
|
||||
boolean processed;
|
||||
|
||||
public CacheOperationContexts(Collection<? extends CacheOperation> operations, Method method,
|
||||
Object[] args, Object target, Class<?> targetClass) {
|
||||
|
||||
@@ -1082,21 +1092,25 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
|
||||
return null;
|
||||
}
|
||||
if (adapter.isMultiValue()) {
|
||||
return adapter.fromPublisher(Flux.from(
|
||||
Mono.fromFuture(cachedFuture)
|
||||
.flatMap(value -> (Mono<?>) evaluate(Mono.justOrEmpty(unwrapCacheValue(value)), invoker, method, contexts)))
|
||||
.flatMap(v -> (v instanceof Iterable<?> iv ? Flux.fromIterable(iv) : Flux.just(v)))
|
||||
.switchIfEmpty(Flux.defer(() -> (Flux<?>) evaluate(null, invoker, method, contexts))));
|
||||
return adapter.fromPublisher(Flux.from(Mono.fromFuture(cachedFuture))
|
||||
.switchIfEmpty(Flux.defer(() -> (Flux) evaluate(null, invoker, method, contexts)))
|
||||
.flatMap(v -> evaluate(valueToFlux(v, contexts), invoker, method, contexts)));
|
||||
}
|
||||
else {
|
||||
return adapter.fromPublisher(Mono.fromFuture(cachedFuture)
|
||||
.flatMap(value -> (Mono<?>) evaluate(Mono.justOrEmpty(unwrapCacheValue(value)), invoker, method, contexts))
|
||||
.switchIfEmpty(Mono.defer(() -> (Mono) evaluate(null, invoker, method, contexts))));
|
||||
.switchIfEmpty(Mono.defer(() -> (Mono) evaluate(null, invoker, method, contexts)))
|
||||
.flatMap(v -> evaluate(Mono.justOrEmpty(unwrapCacheValue(v)), invoker, method, contexts)));
|
||||
}
|
||||
}
|
||||
return NOT_HANDLED;
|
||||
}
|
||||
|
||||
private Flux<?> valueToFlux(Object value, CacheOperationContexts contexts) {
|
||||
Object data = unwrapCacheValue(value);
|
||||
return (!contexts.processed && data instanceof Iterable<?> iterable ? Flux.fromIterable(iterable) :
|
||||
(data != null ? Flux.just(data) : Flux.empty()));
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public Object processPutRequest(CachePutRequest request, @Nullable Object result) {
|
||||
ReactiveAdapter adapter = (result != null ? this.registry.getAdapter(result.getClass()) : null);
|
||||
|
||||
Reference in New Issue
Block a user