Factor out a new strategy for wrapper type detection

Using this strategy libraries could be developed for supporting
Flux-like libraries (e.g. kstreams) that are not actually
reactive streams implementations.
This commit is contained in:
Dave Syer
2018-05-01 08:43:11 -04:00
parent ebd1646308
commit fb04324ac9
7 changed files with 147 additions and 65 deletions

View File

@@ -17,18 +17,17 @@ package org.springframework.cloud.function.context;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Optional;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.support.SpringFactoriesLoader;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* @author Dave Syer
@@ -39,6 +38,8 @@ public class FunctionType {
public static FunctionType UNCLASSIFIED = new FunctionType(ResolvableType
.forClassWithGenerics(Function.class, Object.class, Object.class).getType());
private static List<WrapperDetector> transformers;
final private Type type;
final private Class<?> inputType;
@@ -101,8 +102,17 @@ public class FunctionType {
if (type instanceof ParameterizedType) {
type = ((ParameterizedType) type).getRawType();
}
return Publisher.class.equals(type) || Flux.class.equals(type)
|| Mono.class.equals(type) || Optional.class.equals(type);
if (transformers == null) {
transformers = new ArrayList<>();
transformers.addAll(
SpringFactoriesLoader.loadFactories(WrapperDetector.class, null));
}
for (WrapperDetector transformer : transformers) {
if (transformer.isWrapper(type)) {
return true;
}
}
return false;
}
public static FunctionType of(Type function) {

View File

@@ -0,0 +1,29 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context;
import java.lang.reflect.Type;
/**
* @author Dave Syer
*
*/
public interface WrapperDetector {
boolean isWrapper(Type type);
}

View File

@@ -59,7 +59,6 @@ import org.springframework.cloud.function.core.FluxConsumer;
import org.springframework.cloud.function.core.FluxFunction;
import org.springframework.cloud.function.core.FluxSupplier;
import org.springframework.cloud.function.core.FunctionFactoryMetadata;
import org.springframework.cloud.function.core.FunctionFactoryUtils;
import org.springframework.cloud.function.core.IsolatedConsumer;
import org.springframework.cloud.function.core.IsolatedFunction;
import org.springframework.cloud.function.core.IsolatedSupplier;
@@ -396,18 +395,18 @@ public class ContextFunctionCatalogAutoConfiguration {
return names;
}
private void wrap(FunctionRegistration<Object> registration, String key) {
private void wrap(FunctionRegistration<?> registration, String key) {
Object target = registration.getTarget();
this.names.put(target, key);
if (registration.getType() != null) {
this.types.put(key, registration.getType());
}
else {
findType(target);
registration.type(findType(target).getType());
}
Class<?> type;
target = target(target, key);
registration.target(target);
registration = transform(registration);
target = registration.getTarget();
if (target instanceof Supplier) {
type = Supplier.class;
for (String name : registration.getNames()) {
@@ -437,6 +436,60 @@ public class ContextFunctionCatalogAutoConfiguration {
}
}
private FunctionRegistration<?> transform(FunctionRegistration<?> registration) {
return fluxify(isolated(registration));
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private FunctionRegistration<?> fluxify(FunctionRegistration<?> input) {
FunctionRegistration<Object> registration = (FunctionRegistration<Object>) input;
Object target = registration.getTarget();
FunctionType type = registration.getType();
boolean flux = hasFluxTypes(type);
if (!flux) {
if (target instanceof Supplier<?>) {
target = new FluxSupplier((Supplier<?>) target);
}
else if (target instanceof Function<?, ?>) {
target = new FluxFunction((Function<?, ?>) target);
}
else if (target instanceof Consumer<?>) {
target = new FluxConsumer((Consumer<?>) target);
}
registration.target(target);
}
return registration;
}
private boolean hasFluxTypes(FunctionType type) {
return type.isWrapper();
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private FunctionRegistration<?> isolated(FunctionRegistration<?> input) {
FunctionRegistration<Object> registration = (FunctionRegistration<Object>) input;
Object target = registration.getTarget();
boolean isolated = getClass().getClassLoader() != target.getClass()
.getClassLoader();
if (target instanceof Supplier<?>) {
if (isolated) {
target = new IsolatedSupplier((Supplier<?>) target);
}
}
else if (target instanceof Function<?, ?>) {
if (isolated) {
target = new IsolatedFunction((Function<?, ?>) target);
}
}
else if (target instanceof Consumer<?>) {
if (isolated) {
target = new IsolatedConsumer((Consumer<?>) target);
}
}
registration.target(target);
return registration;
}
private String getQualifier(String key) {
if (registry != null && registry.containsBeanDefinition(key)) {
BeanDefinition beanDefinition = registry.getBeanDefinition(key);
@@ -453,59 +506,6 @@ public class ContextFunctionCatalogAutoConfiguration {
return key;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private Object target(Object target, String key) {
boolean isolated = getClass().getClassLoader() != target.getClass()
.getClassLoader();
if (target instanceof Supplier<?>) {
boolean flux = isFluxSupplier(key, (Supplier<?>) target);
if (isolated) {
target = new IsolatedSupplier((Supplier<?>) target);
}
if (!flux) {
target = new FluxSupplier((Supplier<?>) target);
}
}
else if (target instanceof Function<?, ?>) {
boolean flux = isFluxFunction(key, (Function<?, ?>) target);
if (isolated) {
target = new IsolatedFunction((Function<?, ?>) target);
}
if (!flux) {
target = new FluxFunction((Function<?, ?>) target);
}
}
else if (target instanceof Consumer<?>) {
boolean flux = isFluxConsumer(key, (Consumer<?>) target);
if (isolated) {
target = new IsolatedConsumer((Consumer<?>) target);
}
if (!flux) {
target = new FluxConsumer((Consumer<?>) target);
}
}
return target;
}
private boolean isFluxFunction(String name, Function<?, ?> function) {
boolean fluxTypes = this.hasFluxTypes(function);
return fluxTypes || FunctionFactoryUtils.isFluxFunction(function);
}
private boolean isFluxConsumer(String name, Consumer<?> consumer) {
boolean fluxTypes = this.hasFluxTypes(consumer);
return fluxTypes || FunctionFactoryUtils.isFluxConsumer(consumer);
}
private boolean isFluxSupplier(String name, Supplier<?> supplier) {
boolean fluxTypes = this.hasFluxTypes(supplier);
return fluxTypes || FunctionFactoryUtils.isFluxSupplier(supplier);
}
private boolean hasFluxTypes(Object function) {
return findType(function).isWrapper();
}
private FunctionType findType(String name, AbstractBeanDefinition definition) {
Object source = definition.getSource();
FunctionType param = null;

View File

@@ -0,0 +1,40 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context.config;
import java.lang.reflect.Type;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.context.WrapperDetector;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* @author Dave Syer
*
*/
public class FluxWrapperDetector implements WrapperDetector {
@Override
public boolean isWrapper(Type type) {
return Publisher.class.equals(type) || Flux.class.equals(type)
|| Mono.class.equals(type);
}
}

View File

@@ -1,2 +1,5 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration
org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration
org.springframework.cloud.function.context.WrapperDetector=\
org.springframework.cloud.function.context.config.FluxWrapperDetector