Polishing

This commit is contained in:
Juergen Hoeller
2017-03-21 17:44:47 +01:00
parent 85f64706a8
commit e892e02f41
19 changed files with 162 additions and 137 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -102,7 +102,7 @@ public class ReactiveAdapter {
/**
* Adapt the given instance to a Reactive Streams Publisher.
* @param source the source object to adapt from
* @return the Publisher repesenting the adaptation
* @return the Publisher representing the adaptation
*/
@SuppressWarnings("unchecked")
public <T> Publisher<T> toPublisher(Object source) {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,10 +30,7 @@ import rx.RxReactiveStreams;
import org.springframework.util.ClassUtils;
import static org.springframework.core.ReactiveTypeDescriptor.multiValue;
import static org.springframework.core.ReactiveTypeDescriptor.noValue;
import static org.springframework.core.ReactiveTypeDescriptor.singleOptionalValue;
import static org.springframework.core.ReactiveTypeDescriptor.singleRequiredValue;
import static org.springframework.core.ReactiveTypeDescriptor.*;
/**
* A registry of adapters to adapt a Reactive Streams {@link Publisher} to/from
@@ -69,15 +66,12 @@ public class ReactiveAdapterRegistry {
* Create a registry and auto-register default adapters.
*/
public ReactiveAdapterRegistry() {
if (reactorPresent) {
new ReactorRegistrar().registerAdapters(this);
}
if (rxJava1Present && rxJava1Adapter) {
new RxJava1Registrar().registerAdapters(this);
}
if (rxJava2Present) {
new RxJava2Registrar().registerAdapters(this);
}
@@ -111,15 +105,16 @@ public class ReactiveAdapterRegistry {
* Get the adapter for the given reactive type. Or if a "source" object is
* provided, its actual type is used instead.
* @param reactiveType the reactive type
* @param source an instance of the reactive type (i.e. to adapt from)
* (may be {@code null} if a concrete source object is given)
* @param source an instance of the reactive type
* (i.e. to adapt from; may be {@code null} if the reactive type is specified)
*/
public ReactiveAdapter getAdapter(Class<?> reactiveType, Object source) {
source = (source instanceof Optional ? ((Optional<?>) source).orElse(null) : source);
Class<?> clazz = (source != null ? source.getClass() : reactiveType);
Object sourceToUse = (source instanceof Optional ? ((Optional<?>) source).orElse(null) : source);
Class<?> clazz = (sourceToUse != null ? sourceToUse.getClass() : reactiveType);
return this.adapters.stream()
.filter(adapter -> adapter.getReactiveType().equals(clazz))
.filter(adapter -> adapter.getReactiveType() == clazz)
.findFirst()
.orElseGet(() ->
this.adapters.stream()
@@ -132,7 +127,6 @@ public class ReactiveAdapterRegistry {
private static class ReactorRegistrar {
void registerAdapters(ReactiveAdapterRegistry registry) {
// Flux and Mono ahead of Publisher...
registry.registerReactiveType(
@@ -161,6 +155,7 @@ public class ReactiveAdapterRegistry {
}
}
private static class RxJava1Registrar {
void registerAdapters(ReactiveAdapterRegistry registry) {
@@ -182,6 +177,7 @@ public class ReactiveAdapterRegistry {
}
}
private static class RxJava2Registrar {
void registerAdapters(ReactiveAdapterRegistry registry) {
@@ -213,6 +209,7 @@ public class ReactiveAdapterRegistry {
}
}
/**
* Extension of ReactiveAdapter that wraps adapted (raw) Publisher's as
* {@link Flux} or {@link Mono} depending on the underlying reactive type's