Improve fix of duplicate upstream subscription during reactive cache put

This commit fixes an issue where a Cacheable method which returns a
Flux (or multi-value publisher) will be invoked once, but the returned
publisher is actually subscribed twice.

The previous fix 988f3630c would cause the cached elements to depend on
the first usage pattern / request pattern, which is likely to be too
confusing to users. This fix reintroduces the notion of exhausting the
original Flux by having a second subscriber dedicated to that, but uses
`refCount(2)` to ensure that the original `Flux` returned by the cached
method is still only subscribed once.

Closes gh-32370
This commit is contained in:
Simon Baslé
2024-03-07 11:13:55 +01:00
parent c1d4b610ca
commit 6d9a2eb9b8
3 changed files with 74 additions and 34 deletions

View File

@@ -26,12 +26,12 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.observability.DefaultSignalListener;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -90,7 +90,6 @@ import org.springframework.util.function.SupplierUtils;
* @author Sam Brannen
* @author Stephane Nicoll
* @author Sebastien Deleuze
* @author Simon Baslé
* @since 3.1
*/
public abstract class CacheAspectSupport extends AbstractCacheInvoker
@@ -1037,45 +1036,34 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
/**
* Reactor stateful SignalListener for collecting a List to cache.
* Reactive Streams Subscriber for exhausting the Flux and collecting a List
* to cache.
*/
private class CachePutSignalListener extends DefaultSignalListener<Object> {
private final class CachePutListSubscriber implements Subscriber<Object> {
private final AtomicReference<CachePutRequest> request;
private final CachePutRequest request;
private final List<Object> cacheValue = new ArrayList<>();
public CachePutSignalListener(CachePutRequest request) {
this.request = new AtomicReference<>(request);
public CachePutListSubscriber(CachePutRequest request) {
this.request = request;
}
@Override
public void doOnNext(Object o) {
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Object o) {
this.cacheValue.add(o);
}
@Override
public void doOnComplete() {
CachePutRequest r = this.request.get();
if (this.request.compareAndSet(r, null)) {
r.performCachePut(this.cacheValue);
}
public void onError(Throwable t) {
this.cacheValue.clear();
}
@Override
public void doOnCancel() {
// Note: we don't use doFinally as we want to propagate the signal after cache put, not before
CachePutRequest r = this.request.get();
if (this.request.compareAndSet(r, null)) {
r.performCachePut(this.cacheValue);
}
}
@Override
public void doOnError(Throwable error) {
if (this.request.getAndSet(null) != null) {
this.cacheValue.clear();
}
public void onComplete() {
this.request.performCachePut(this.cacheValue);
}
}
@@ -1159,8 +1147,10 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
ReactiveAdapter adapter = (result != null ? this.registry.getAdapter(result.getClass()) : null);
if (adapter != null) {
if (adapter.isMultiValue()) {
return adapter.fromPublisher(Flux.from(adapter.toPublisher(result))
.tap(() -> new CachePutSignalListener(request)));
Flux<?> source = Flux.from(adapter.toPublisher(result))
.publish().refCount(2);
source.subscribe(new CachePutListSubscriber(request));
return adapter.fromPublisher(source);
}
else {
return adapter.fromPublisher(Mono.from(adapter.toPublisher(result))