Improved reactive type descriptor support

ReactiveTypeDescriptor is now a top-level type with named constructors.
It also contains and provides a getter for the actual type.

Issue: SPR-14902
This commit is contained in:
Rossen Stoyanchev
2016-11-24 15:56:50 -05:00
parent 4f37fdb133
commit 2e7d16df04
5 changed files with 184 additions and 102 deletions

View File

@@ -37,7 +37,7 @@ public interface ReactiveAdapter {
/**
* Return a descriptor with further information about the adaptee.
*/
Descriptor getDescriptor();
ReactiveTypeDescriptor getDescriptor();
/**
* Adapt the given Object to a {@link Mono}
@@ -67,48 +67,4 @@ public interface ReactiveAdapter {
*/
Object fromPublisher(Publisher<?> publisher);
/**
* A descriptor with information about the adaptee stream semantics.
*/
class Descriptor {
private final boolean isMultiValue;
private final boolean supportsEmpty;
private final boolean isNoValue;
public Descriptor(boolean isMultiValue, boolean canBeEmpty, boolean isNoValue) {
this.isMultiValue = isMultiValue;
this.supportsEmpty = canBeEmpty;
this.isNoValue = isNoValue;
}
/**
* Return {@code true} if the adaptee implies 0..N values can be produced
* and is therefore a good fit to adapt to {@link Flux}. A {@code false}
* return value implies the adaptee will produce 1 value at most and is
* therefore a good fit for {@link Mono}.
*/
public boolean isMultiValue() {
return this.isMultiValue;
}
/**
* Return {@code true} if the adaptee can complete without values.
*/
public boolean supportsEmpty() {
return this.supportsEmpty;
}
/**
* Return {@code true} if the adaptee implies no values will be produced,
* i.e. providing only completion or error signal.
*/
public boolean isNoValue() {
return this.isNoValue;
}
}
}

View File

@@ -16,8 +16,8 @@
package org.springframework.core;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
@@ -29,19 +29,17 @@ import io.reactivex.Maybe;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Completable;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.Single;
import org.springframework.util.ClassUtils;
/**
* A registry of adapters to adapt to {@link Flux} and {@link Mono}.
*
* <p>By default there are adapters for {@link CompletableFuture}, RxJava 1, and
* also for a any Reactive Streams {@link Publisher}. Additional adapters can be
* registered via {@link #registerFluxAdapter} and {@link #registerMonoAdapter}.
* <p>By default, depending on classpath availability, adapters are registered
* for RxJava 1, RxJava 2 types, and {@link CompletableFuture}. In addition the
* registry contains adapters for Reactor's own Flux and Mono types (no-op)
* along with adaption for any other Reactive Streams {@link Publisher}.
*
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
@@ -58,25 +56,30 @@ public class ReactiveAdapterRegistry {
private static final boolean rxJava2Present =
ClassUtils.isPresent("io.reactivex.Flowable", ReactiveAdapterRegistry.class.getClassLoader());
private final Map<Class<?>, ReactiveAdapter> adapterMap = new LinkedHashMap<>(4);
private final List<ReactiveAdapter> adapters = new ArrayList<>(32);
/**
* Create a registry and auto-register default adapters.
*/
public ReactiveAdapterRegistry() {
// Flux and Mono ahead of Publisher...
registerMonoAdapter(Mono.class,
source -> (Mono<?>) source, source -> source,
new ReactiveAdapter.Descriptor(false, true, false));
registerFluxAdapter(
Flux.class, source -> (Flux<?>) source, source -> source);
registerFluxAdapter(
Publisher.class, source -> Flux.from((Publisher<?>) source), source -> source);
ReactiveTypeDescriptor.singleOptionalValue(Mono.class));
registerFluxAdapter(Flux.class,
source -> (Flux<?>) source, source -> source);
registerFluxAdapter(Publisher.class,
source -> Flux.from((Publisher<?>) source), source -> source);
registerMonoAdapter(CompletableFuture.class,
source -> Mono.fromFuture((CompletableFuture<?>) source), Mono::toFuture,
new ReactiveAdapter.Descriptor(false, true, false)
ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class)
);
if (rxJava1Present && rxJava1Adapter) {
@@ -93,10 +96,10 @@ public class ReactiveAdapterRegistry {
* <p>The provided functions can assume that input will never be {@code null}
* and also that any {@link Optional} wrapper is unwrapped.
*/
public void registerMonoAdapter(Class<?> adapteeType, Function<Object, Mono<?>> toAdapter,
Function<Mono<?>, Object> fromAdapter, ReactiveAdapter.Descriptor descriptor) {
public void registerMonoAdapter(Class<?> reactiveType, Function<Object, Mono<?>> toAdapter,
Function<Mono<?>, Object> fromAdapter, ReactiveTypeDescriptor descriptor) {
this.adapterMap.put(adapteeType, new MonoReactiveAdapter(toAdapter, fromAdapter, descriptor));
this.adapters.add(new MonoReactiveAdapter(toAdapter, fromAdapter, descriptor));
}
/**
@@ -104,57 +107,57 @@ public class ReactiveAdapterRegistry {
* <p>The provided functions can assume that input will never be {@code null}
* and also that any {@link Optional} wrapper is unwrapped.
*/
public void registerFluxAdapter(Class<?> adapteeType, Function<Object, Flux<?>> toAdapter,
public void registerFluxAdapter(Class<?> reactiveType, Function<Object, Flux<?>> toAdapter,
Function<Flux<?>, Object> fromAdapter) {
this.adapterMap.put(adapteeType, new FluxReactiveAdapter(toAdapter, fromAdapter));
this.adapters.add(new FluxReactiveAdapter(toAdapter, fromAdapter,
ReactiveTypeDescriptor.multiValue(reactiveType)));
}
/**
* Get the adapter for the given adaptee type to adapt from.
* Get the adapter for the given reactive type to adapt from.
*/
public ReactiveAdapter getAdapterFrom(Class<?> adapteeType) {
return getAdapterFrom(adapteeType, null);
public ReactiveAdapter getAdapterFrom(Class<?> reactiveType) {
return getAdapterFrom(reactiveType, null);
}
/**
* Get the adapter for the given adaptee type to adapt from.
* Get the adapter for the given reactive type to adapt from.
* If the instance is not {@code null} its actual type is used to check.
*/
public ReactiveAdapter getAdapterFrom(Class<?> adapteeType, Object adaptee) {
Class<?> actualType = getActualType(adapteeType, adaptee);
public ReactiveAdapter getAdapterFrom(Class<?> reactiveType, Object adaptee) {
Class<?> actualType = getActualType(reactiveType, adaptee);
return getAdapterInternal(supportedType -> supportedType.isAssignableFrom(actualType));
}
/**
* Get the adapter for the given adaptee type to adapt to.
* Get the adapter for the given reactive type to adapt to.
*/
public ReactiveAdapter getAdapterTo(Class<?> adapteeType) {
return getAdapterTo(adapteeType, null);
public ReactiveAdapter getAdapterTo(Class<?> reactiveType) {
return getAdapterTo(reactiveType, null);
}
/**
* Get the adapter for the given adaptee type to adapt to.
* Get the adapter for the given reactive type to adapt to.
* If the instance is not {@code null} its actual type is used to check.
*/
public ReactiveAdapter getAdapterTo(Class<?> adapteeType, Object adaptee) {
Class<?> actualType = getActualType(adapteeType, adaptee);
public ReactiveAdapter getAdapterTo(Class<?> reactiveType, Object adaptee) {
Class<?> actualType = getActualType(reactiveType, adaptee);
return getAdapterInternal(supportedType -> supportedType.equals(actualType));
}
private ReactiveAdapter getAdapterInternal(Predicate<Class<?>> adapteeTypePredicate) {
return this.adapterMap.keySet().stream()
.filter(adapteeTypePredicate)
.map(this.adapterMap::get)
private ReactiveAdapter getAdapterInternal(Predicate<Class<?>> predicate) {
return this.adapters.stream()
.filter(adapter -> predicate.test(adapter.getDescriptor().getReactiveType()))
.findFirst()
.orElse(null);
}
private static Class<?> getActualType(Class<?> adapteeType, Object adaptee) {
private static Class<?> getActualType(Class<?> reactiveType, Object adaptee) {
adaptee = unwrapOptional(adaptee);
return (adaptee != null ? adaptee.getClass() : adapteeType);
return (adaptee != null ? adaptee.getClass() : reactiveType);
}
private static Object unwrapOptional(Object value) {
@@ -169,17 +172,19 @@ public class ReactiveAdapterRegistry {
private final Function<Mono<?>, Object> fromAdapter;
private final Descriptor descriptor;
private final ReactiveTypeDescriptor descriptor;
MonoReactiveAdapter(Function<Object, Mono<?>> to, Function<Mono<?>, Object> from, Descriptor descriptor) {
MonoReactiveAdapter(Function<Object, Mono<?>> to, Function<Mono<?>, Object> from,
ReactiveTypeDescriptor descriptor) {
this.toAdapter = to;
this.fromAdapter = from;
this.descriptor = descriptor;
}
@Override
public Descriptor getDescriptor() {
public ReactiveTypeDescriptor getDescriptor() {
return this.descriptor;
}
@@ -219,16 +224,19 @@ public class ReactiveAdapterRegistry {
private final Function<Flux<?>, Object> fromAdapter;
private final Descriptor descriptor = new Descriptor(true, true, false);
private final ReactiveTypeDescriptor descriptor;
FluxReactiveAdapter(Function<Object, Flux<?>> to, Function<Flux<?>, Object> from) {
FluxReactiveAdapter(Function<Object, Flux<?>> to, Function<Flux<?>, Object> from,
ReactiveTypeDescriptor descriptor) {
this.descriptor = descriptor;
this.toAdapter = to;
this.fromAdapter = from;
}
@Override
public Descriptor getDescriptor() {
public ReactiveTypeDescriptor getDescriptor() {
return this.descriptor;
}
@@ -265,19 +273,19 @@ public class ReactiveAdapterRegistry {
private static class RxJava1AdapterRegistrar {
public void register(ReactiveAdapterRegistry registry) {
registry.registerFluxAdapter(Observable.class,
source -> Flux.from(RxReactiveStreams.toPublisher((Observable<?>) source)),
registry.registerFluxAdapter(rx.Observable.class,
source -> Flux.from(RxReactiveStreams.toPublisher((rx.Observable<?>) source)),
RxReactiveStreams::toObservable
);
registry.registerMonoAdapter(Single.class,
source -> Mono.from(RxReactiveStreams.toPublisher((Single<?>) source)),
registry.registerMonoAdapter(rx.Single.class,
source -> Mono.from(RxReactiveStreams.toPublisher((rx.Single<?>) source)),
RxReactiveStreams::toSingle,
new ReactiveAdapter.Descriptor(false, false, false)
ReactiveTypeDescriptor.singleRequiredValue(rx.Single.class)
);
registry.registerMonoAdapter(Completable.class,
source -> Mono.from(RxReactiveStreams.toPublisher((Completable) source)),
registry.registerMonoAdapter(rx.Completable.class,
source -> Mono.from(RxReactiveStreams.toPublisher((rx.Completable) source)),
RxReactiveStreams::toCompletable,
new ReactiveAdapter.Descriptor(false, true, true)
ReactiveTypeDescriptor.noValue(rx.Completable.class)
);
}
}
@@ -296,17 +304,17 @@ public class ReactiveAdapterRegistry {
registry.registerMonoAdapter(io.reactivex.Single.class,
source -> Mono.from(((io.reactivex.Single<?>) source).toFlowable()),
source -> Flowable.fromPublisher(source).toObservable().singleElement().toSingle(),
new ReactiveAdapter.Descriptor(false, false, false)
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class)
);
registry.registerMonoAdapter(Maybe.class,
source -> Mono.from(((Maybe<?>) source).toFlowable()),
source -> Flowable.fromPublisher(source).toObservable().singleElement(),
new ReactiveAdapter.Descriptor(false, true, false)
ReactiveTypeDescriptor.singleOptionalValue(Maybe.class)
);
registry.registerMonoAdapter(io.reactivex.Completable.class,
source -> Mono.from(((io.reactivex.Completable) source).toFlowable()),
source -> Flowable.fromPublisher(source).toObservable().ignoreElements(),
new ReactiveAdapter.Descriptor(false, true, true)
ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class)
);
}
}

View File

@@ -0,0 +1,116 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.util.Assert;
/**
* Descriptor for a reactive type with information its stream semantics, i.e.
* how many values it can produce.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class ReactiveTypeDescriptor {
private final Class<?> reactiveType;
private final boolean multiValue;
private final boolean supportsEmpty;
private final boolean noValue;
/**
* Private constructor. See static factory methods.
*/
private ReactiveTypeDescriptor(Class<?> reactiveType, boolean multiValue,
boolean canBeEmpty, boolean noValue) {
Assert.notNull(reactiveType, "'reactiveType' must not be null");
this.reactiveType = reactiveType;
this.multiValue = multiValue;
this.supportsEmpty = canBeEmpty;
this.noValue = noValue;
}
/**
* Return the reactive type the descriptor was created for.
*/
public Class<?> getReactiveType() {
return this.reactiveType;
}
/**
* Return {@code true} if the reactive type can produce more than 1 value
* can be produced and is therefore a good fit to adapt to {@link Flux}.
* A {@code false} return value implies the reactive type can produce 1
* value at most and is therefore a good fit to adapt to {@link Mono}.
*/
public boolean isMultiValue() {
return this.multiValue;
}
/**
* Return {@code true} if the reactive type can complete with no values.
*/
public boolean supportsEmpty() {
return this.supportsEmpty;
}
/**
* Return {@code true} if the reactive type does not produce any values and
* only provides completion and error signals.
*/
public boolean isNoValue() {
return this.noValue;
}
/**
* Descriptor for a reactive type that can produce 0..N values.
*/
public static ReactiveTypeDescriptor multiValue(Class<?> reactiveType) {
return new ReactiveTypeDescriptor(reactiveType, true, true, false);
}
/**
* Descriptor for a reactive type that can produce 0..1 values.
*/
public static ReactiveTypeDescriptor singleOptionalValue(Class<?> reactiveType) {
return new ReactiveTypeDescriptor(reactiveType, false, true, false);
}
/**
* Descriptor for a reactive type that must produce 1 value to complete.
*/
public static ReactiveTypeDescriptor singleRequiredValue(Class<?> reactiveType) {
return new ReactiveTypeDescriptor(reactiveType, false, false, false);
}
/**
* Descriptor for a reactive type that does not produce any values.
*/
public static ReactiveTypeDescriptor noValue(Class<?> reactiveType) {
return new ReactiveTypeDescriptor(reactiveType, false, true, true);
}
}