Consistently support CompletionStage in ReactiveAdapterRegistry
Aligns ReactiveAdapterRegistry with MVC/messaging handler methods in terms of recognizing CompletionStage as well as CompletableFuture. Includes consistent use of ReactiveAdapter for reactive transactions. Closes gh-23011
This commit is contained in:
@@ -21,6 +21,7 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.function.Function;
|
||||
|
||||
import io.reactivex.BackpressureStrategy;
|
||||
@@ -221,12 +222,8 @@ public class ReactiveAdapterRegistry {
|
||||
source -> source);
|
||||
|
||||
registry.registerReactiveType(
|
||||
ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> {
|
||||
CompletableFuture<?> empty = new CompletableFuture<>();
|
||||
empty.complete(null);
|
||||
return empty;
|
||||
}),
|
||||
source -> Mono.fromFuture((CompletableFuture<?>) source),
|
||||
ReactiveTypeDescriptor.singleOptionalValue(CompletionStage.class, EmptyCompletableFuture::new),
|
||||
source -> Mono.fromCompletionStage((CompletionStage<?>) source),
|
||||
source -> Mono.from(source).toFuture()
|
||||
);
|
||||
}
|
||||
@@ -338,17 +335,28 @@ public class ReactiveAdapterRegistry {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class EmptyCompletableFuture<T> extends CompletableFuture<T> {
|
||||
|
||||
EmptyCompletableFuture() {
|
||||
complete(null);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class CoroutinesRegistrar {
|
||||
|
||||
@SuppressWarnings("KotlinInternalInJava")
|
||||
void registerAdapters(ReactiveAdapterRegistry registry) {
|
||||
registry.registerReactiveType(
|
||||
ReactiveTypeDescriptor.singleOptionalValue(Deferred.class, () -> CompletableDeferredKt.CompletableDeferred(null)),
|
||||
ReactiveTypeDescriptor.singleOptionalValue(Deferred.class,
|
||||
() -> CompletableDeferredKt.CompletableDeferred(null)),
|
||||
source -> CoroutinesUtils.deferredToMono((Deferred<?>) source),
|
||||
source -> CoroutinesUtils.monoToDeferred(Mono.from(source)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class CoroutinesFlowRegistrar {
|
||||
|
||||
void registerAdapters(ReactiveAdapterRegistry registry) {
|
||||
|
||||
@@ -834,16 +834,13 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||
return TransactionContextManager.currentContext().flatMap(context ->
|
||||
createTransactionIfNecessary(tm, txAttr, joinpointIdentification).flatMap(it -> {
|
||||
try {
|
||||
// This is an around advice: Invoke the next interceptor in the chain.
|
||||
// This will normally result in a target object being invoked.
|
||||
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
|
||||
// through usingWhen.
|
||||
// Need re-wrapping until we get hold of the exception through usingWhen.
|
||||
return Mono.<Object, ReactiveTransactionInfo>usingWhen(Mono.just(it), txInfo -> {
|
||||
try {
|
||||
return (Mono<?>) invocation.proceedWithInvocation();
|
||||
}
|
||||
catch (Throwable throwable) {
|
||||
return Mono.error(throwable);
|
||||
catch (Throwable ex) {
|
||||
return Mono.error(ex);
|
||||
}
|
||||
}, this::commitTransactionAfterReturning, txInfo -> Mono.empty())
|
||||
.onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
|
||||
@@ -856,19 +853,17 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||
.subscriberContext(TransactionContextManager.getOrCreateContextHolder());
|
||||
}
|
||||
|
||||
return TransactionContextManager.currentContext().flatMapMany(context ->
|
||||
// Any other reactive type, typically a Flux
|
||||
return this.adapter.fromPublisher(TransactionContextManager.currentContext().flatMapMany(context ->
|
||||
createTransactionIfNecessary(tm, txAttr, joinpointIdentification).flatMapMany(it -> {
|
||||
try {
|
||||
// This is an around advice: Invoke the next interceptor in the chain.
|
||||
// This will normally result in a target object being invoked.
|
||||
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
|
||||
// through usingWhen.
|
||||
// Need re-wrapping until we get hold of the exception through usingWhen.
|
||||
return Flux.usingWhen(Mono.just(it), txInfo -> {
|
||||
try {
|
||||
return this.adapter.toPublisher(invocation.proceedWithInvocation());
|
||||
}
|
||||
catch (Throwable throwable) {
|
||||
return Mono.error(throwable);
|
||||
catch (Throwable ex) {
|
||||
return Mono.error(ex);
|
||||
}
|
||||
}, this::commitTransactionAfterReturning, txInfo -> Mono.empty())
|
||||
.onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
|
||||
@@ -878,7 +873,7 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||
return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex));
|
||||
}
|
||||
})).subscriberContext(TransactionContextManager.getOrCreateContext())
|
||||
.subscriberContext(TransactionContextManager.getOrCreateContextHolder());
|
||||
.subscriberContext(TransactionContextManager.getOrCreateContextHolder()));
|
||||
}
|
||||
|
||||
@Nullable
|
||||
|
||||
Reference in New Issue
Block a user