Add ReactiveAdapterRegistry

Issue: SPR-14159
This commit is contained in:
Rossen Stoyanchev
2016-07-22 00:17:13 -04:00
parent 1a2ac8ea56
commit 101220bad1
29 changed files with 775 additions and 635 deletions

View File

@@ -0,0 +1,116 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Contract for adapting to and from {@link Flux} and {@link Mono}.
*
* <p>An adapter supports a specific adaptee type whose stream semantics can be
* checked via {@link #getDescriptor()}.
*
* <p>Use the {@link ReactiveAdapterRegistry} to obtain an adapter for a
* supported adaptee type or to register additional adapters.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public interface ReactiveAdapter {
/**
* Return a descriptor with further information about the adaptee.
*/
Descriptor getDescriptor();
/**
* Adapt the given Object to a {@link Mono}
* @param source the source object to adapt
* @return the resulting {@link Mono} possibly empty
*/
<T> Mono<T> toMono(Object source);
/**
* Adapt the given Object to a {@link Flux}.
* @param source the source object to adapt
* @return the resulting {@link Flux} possibly empty
*/
<T> Flux<T> toFlux(Object source);
/**
* Adapt the given Object to a Publisher.
* @param source the source object to adapt
* @return the resulting {@link Mono} or {@link Flux} possibly empty
*/
<T> Publisher<T> toPublisher(Object source);
/**
* Adapt the given Publisher to the target adaptee.
* @param publisher the publisher to adapt
* @return the resulting adaptee
*/
Object fromPublisher(Publisher<?> publisher);
/**
* A descriptor with information about the adaptee stream semantics.
*/
class Descriptor {
private final boolean isMultiValue;
private final boolean supportsEmpty;
private final boolean isNoValue;
public Descriptor(boolean isMultiValue, boolean canBeEmpty, boolean isNoValue) {
this.isMultiValue = isMultiValue;
this.supportsEmpty = canBeEmpty;
this.isNoValue = isNoValue;
}
/**
* Return {@code true} if the adaptee implies 0..N values can be produced
* and is therefore a good fit to adapt to {@link Flux}. A {@code false}
* return value implies the adaptee will produce 1 value at most and is
* therefore a good fit for {@link Mono}.
*/
public boolean isMultiValue() {
return this.isMultiValue;
}
/**
* Return {@code true} if the adaptee can complete without values.
*/
public boolean supportsEmpty() {
return this.supportsEmpty;
}
/**
* Return {@code true} if the adaptee implies no values will be produced,
* i.e. providing only completion or error signal.
*/
public boolean isNoValue() {
return this.isNoValue;
}
}
}

View File

@@ -0,0 +1,282 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import reactor.adapter.RxJava1Adapter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Completable;
import rx.Observable;
import rx.Single;
import org.springframework.core.ReactiveAdapter.Descriptor;
import org.springframework.util.ClassUtils;
/**
* A registry of adapters to adapt to {@link Flux} and {@link Mono}.
*
* <p>By default there are adapters for {@link CompletableFuture}, RxJava 1, and
* also for a any Reactive Streams {@link Publisher}. Additional adapters can be
* registered via {@link #registerFluxAdapter) and {@link #registerMonoAdapter}.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class ReactiveAdapterRegistry {
private static final boolean rxJava1Present =
ClassUtils.isPresent("rx.Observable", ReactiveAdapterRegistry.class.getClassLoader());
private final Map<Class<?>, ReactiveAdapter> adapterMap = new LinkedHashMap<>();
/**
* Create a registry and auto-register default adapters.
*/
public ReactiveAdapterRegistry() {
// Flux and Mono ahead of Publisher...
registerMonoAdapter(Mono.class,
source -> (Mono<?>) source, source -> source, new Descriptor(false, true, false));
registerFluxAdapter(
Flux.class, source -> (Flux<?>) source, source -> source);
registerFluxAdapter(
Publisher.class, source -> Flux.from((Publisher<?>) source), source -> source);
registerMonoAdapter(CompletableFuture.class,
source -> Mono.fromFuture((CompletableFuture<?>) source),
source -> Mono.from((Publisher<?>) source).toFuture(),
new Descriptor(false, true, false)
);
if (rxJava1Present) {
new RxJava1AdapterRegistrar().register(this);
}
}
/**
* Register an adapter for adapting to and from a {@link Mono}. The provided
* functions can assume that input will never be {@code null} and also that
* any {@link Optional} wrapper is unwrapped.
*/
public void registerMonoAdapter(Class<?> adapteeType,
Function<Object, Mono<?>> toAdapter, Function<Mono<?>, Object> fromAdapter,
Descriptor descriptor) {
this.adapterMap.put(adapteeType, new MonoReactiveAdapter(toAdapter, fromAdapter, descriptor));
}
/**
* Register an adapter for adapting to and from a {@link Flux}. The provided
* functions can assume that input will never be {@code null} and also that
* any {@link Optional} wrapper is unwrapped.
*/
public void registerFluxAdapter(Class<?> adapteeType,
Function<Object, Flux<?>> toAdapter, Function<Flux<?>, Object> fromAdapter) {
this.adapterMap.put(adapteeType, new FluxReactiveAdapter(toAdapter, fromAdapter));
}
/**
* Get the adapter for the given adaptee type to adapt from.
*/
public ReactiveAdapter getAdapterFrom(Class<?> adapteeType) {
return getAdapterFrom(adapteeType, null);
}
/**
* Get the adapter for the given adaptee type to adapt from.
* If the instance is not {@code null} its actual type is used to check.
*/
public ReactiveAdapter getAdapterFrom(Class<?> adapteeType, Object adaptee) {
Class<?> actualType = getActualType(adapteeType, adaptee);
return getAdapterInternal(supportedType -> supportedType.isAssignableFrom(actualType));
}
/**
* Get the adapter for the given adaptee type to adapt to.
*/
public ReactiveAdapter getAdapterTo(Class<?> adapteeType) {
return getAdapterTo(adapteeType, null);
}
/**
* Get the adapter for the given adaptee type to adapt to.
* If the instance is not {@code null} its actual type is used to check.
*/
public ReactiveAdapter getAdapterTo(Class<?> adapteeType, Object adaptee) {
Class<?> actualType = getActualType(adapteeType, adaptee);
return getAdapterInternal(supportedType -> supportedType.equals(actualType));
}
private static Class<?> getActualType(Class<?> adapteeType, Object adaptee) {
adaptee = unwrapOptional(adaptee);
return (adaptee != null ? adaptee.getClass() : adapteeType);
}
private static Object unwrapOptional(Object value) {
if (value != null && value instanceof Optional) {
value = ((Optional<?>) value).orElse(null);
}
return value;
}
private ReactiveAdapter getAdapterInternal(Predicate<Class<?>> adapteeTypePredicate) {
return this.adapterMap.keySet().stream()
.filter(adapteeTypePredicate)
.map(this.adapterMap::get)
.findFirst()
.orElse(null);
}
@SuppressWarnings("unchecked")
private static class MonoReactiveAdapter implements ReactiveAdapter {
private final Function<Object, Mono<?>> toAdapter;
private final Function<Mono<?>, Object> fromAdapter;
private final Descriptor descriptor;
MonoReactiveAdapter(Function<Object, Mono<?>> to, Function<Mono<?>, Object> from, Descriptor descriptor) {
this.toAdapter = to;
this.fromAdapter = from;
this.descriptor = descriptor;
}
@Override
public Descriptor getDescriptor() {
return this.descriptor;
}
@Override
public <T> Mono<T> toMono(Object source) {
source = unwrapOptional(source);
if (source == null) {
return Mono.empty();
}
return (Mono<T>) this.toAdapter.apply(source);
}
@Override
public <T> Flux<T> toFlux(Object source) {
source = unwrapOptional(source);
if (source == null) {
return Flux.empty();
}
return (Flux<T>) this.toMono(source).flux();
}
@Override
public <T> Publisher<T> toPublisher(Object source) {
return toMono(source);
}
@Override
public Object fromPublisher(Publisher<?> source) {
return (source != null ? this.fromAdapter.apply((Mono<?>) source) : null);
}
}
@SuppressWarnings("unchecked")
private static class FluxReactiveAdapter implements ReactiveAdapter {
private final Function<Object, Flux<?>> toAdapter;
private final Function<Flux<?>, Object> fromAdapter;
private final Descriptor descriptor = new Descriptor(true, true, false);
FluxReactiveAdapter(Function<Object, Flux<?>> to, Function<Flux<?>, Object> from) {
this.toAdapter = to;
this.fromAdapter = from;
}
@Override
public Descriptor getDescriptor() {
return this.descriptor;
}
@Override
public <T> Mono<T> toMono(Object source) {
source = unwrapOptional(source);
if (source == null) {
return Mono.empty();
}
return (Mono<T>) this.toAdapter.apply(source).next();
}
@Override
public <T> Flux<T> toFlux(Object source) {
source = unwrapOptional(source);
if (source == null) {
return Flux.empty();
}
return (Flux<T>) this.toAdapter.apply(source);
}
@Override
public <T> Publisher<T> toPublisher(Object source) {
return toFlux(source);
}
@Override
public Object fromPublisher(Publisher<?> source) {
return (source != null ? this.fromAdapter.apply((Flux<?>) source) : null);
}
}
private static class RxJava1AdapterRegistrar {
public void register(ReactiveAdapterRegistry registry) {
registry.registerFluxAdapter(Observable.class,
source -> RxJava1Adapter.observableToFlux((Observable<?>) source),
RxJava1Adapter::publisherToObservable
);
registry.registerMonoAdapter(Single.class,
source -> RxJava1Adapter.singleToMono((Single<?>) source),
RxJava1Adapter::publisherToSingle,
new Descriptor(false, false, false)
);
registry.registerMonoAdapter(Completable.class,
source -> RxJava1Adapter.completableToMono((Completable) source),
RxJava1Adapter::publisherToCompletable,
new Descriptor(false, true, true)
);
}
}
}

View File

@@ -1,62 +0,0 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.convert.support;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.core.convert.TypeDescriptor;
import org.springframework.core.convert.converter.GenericConverter;
/**
* Converter to adapt {@link CompletableFuture} to Reactive Streams and
* Reactor {@link Mono}.
*
* @author Sebastien Deleuze
* @since 5.0
*/
public class MonoToCompletableFutureConverter implements GenericConverter {
@Override
public Set<GenericConverter.ConvertiblePair> getConvertibleTypes() {
Set<GenericConverter.ConvertiblePair> pairs = new LinkedHashSet<>(2);
pairs.add(new GenericConverter.ConvertiblePair(Mono.class, CompletableFuture.class));
pairs.add(new GenericConverter.ConvertiblePair(CompletableFuture.class, Mono.class));
return pairs;
}
@Override
public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {
if (source == null) {
return null;
}
else if (CompletableFuture.class.isAssignableFrom(sourceType.getType())) {
return Mono.fromFuture((CompletableFuture<?>) source);
}
else if (CompletableFuture.class.isAssignableFrom(targetType.getType())) {
return Mono.from((Publisher<?>) source).toFuture();
}
return null;
}
}

View File

@@ -1,81 +0,0 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.convert.support;
import java.util.LinkedHashSet;
import java.util.Set;
import org.reactivestreams.Publisher;
import reactor.adapter.RxJava1Adapter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Completable;
import rx.Observable;
import rx.Single;
import org.springframework.core.convert.TypeDescriptor;
import org.springframework.core.convert.converter.GenericConverter;
/**
* Converter to adapt RxJava1 {@link Observable}, {@link Single}, and
* {@link Completable} to Reactive Streams and Reactor types.
*
* @author Stephane Maldini
* @author Sebastien Deleuze
* @since 5.0
*/
public final class ReactorToRxJava1Converter implements GenericConverter {
@Override
public Set<GenericConverter.ConvertiblePair> getConvertibleTypes() {
Set<GenericConverter.ConvertiblePair> pairs = new LinkedHashSet<>(6);
pairs.add(new GenericConverter.ConvertiblePair(Flux.class, Observable.class));
pairs.add(new GenericConverter.ConvertiblePair(Observable.class, Flux.class));
pairs.add(new GenericConverter.ConvertiblePair(Mono.class, Single.class));
pairs.add(new GenericConverter.ConvertiblePair(Single.class, Mono.class));
pairs.add(new GenericConverter.ConvertiblePair(Mono.class, Completable.class));
pairs.add(new GenericConverter.ConvertiblePair(Completable.class, Mono.class));
return pairs;
}
@Override
public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {
if (source == null) {
return null;
}
if (Observable.class.isAssignableFrom(sourceType.getType())) {
return RxJava1Adapter.observableToFlux((Observable<?>) source);
}
else if (Observable.class.isAssignableFrom(targetType.getType())) {
return RxJava1Adapter.publisherToObservable((Publisher<?>) source);
}
else if (Single.class.isAssignableFrom(sourceType.getType())) {
return RxJava1Adapter.singleToMono((Single<?>) source);
}
else if (Single.class.isAssignableFrom(targetType.getType())) {
return RxJava1Adapter.publisherToSingle((Publisher<?>) source);
}
else if (Completable.class.isAssignableFrom(sourceType.getType())) {
return RxJava1Adapter.completableToMono((Completable) source);
}
else if (Completable.class.isAssignableFrom(targetType.getType())) {
return RxJava1Adapter.publisherToCompletable((Publisher<?>) source);
}
return null;
}
}