From fb04324ac9b1dfffaf39e140766e60e64c062170 Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Tue, 1 May 2018 08:43:11 -0400 Subject: [PATCH] Factor out a new strategy for wrapper type detection Using this strategy libraries could be developed for supporting Flux-like libraries (e.g. kstreams) that are not actually reactive streams implementations. --- .../function/core/FunctionFactoryUtils.java | 0 .../core/FunctionFactoryUtilsTests.java | 0 .../cloud/function/context/FunctionType.java | 22 +++- .../function/context/WrapperDetector.java | 29 +++++ ...ntextFunctionCatalogAutoConfiguration.java | 116 +++++++++--------- .../context/config/FluxWrapperDetector.java | 40 ++++++ .../main/resources/META-INF/spring.factories | 5 +- 7 files changed, 147 insertions(+), 65 deletions(-) rename {spring-cloud-function-core/src/main => spring-cloud-function-compiler/src/test}/java/org/springframework/cloud/function/core/FunctionFactoryUtils.java (100%) rename {spring-cloud-function-core => spring-cloud-function-compiler}/src/test/java/org/springframework/cloud/function/core/FunctionFactoryUtilsTests.java (100%) create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/WrapperDetector.java create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/FluxWrapperDetector.java diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionFactoryUtils.java b/spring-cloud-function-compiler/src/test/java/org/springframework/cloud/function/core/FunctionFactoryUtils.java similarity index 100% rename from spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionFactoryUtils.java rename to spring-cloud-function-compiler/src/test/java/org/springframework/cloud/function/core/FunctionFactoryUtils.java diff --git a/spring-cloud-function-core/src/test/java/org/springframework/cloud/function/core/FunctionFactoryUtilsTests.java b/spring-cloud-function-compiler/src/test/java/org/springframework/cloud/function/core/FunctionFactoryUtilsTests.java similarity index 100% rename from spring-cloud-function-core/src/test/java/org/springframework/cloud/function/core/FunctionFactoryUtilsTests.java rename to spring-cloud-function-compiler/src/test/java/org/springframework/cloud/function/core/FunctionFactoryUtilsTests.java diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionType.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionType.java index aee52a1f3..fab2bb672 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionType.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionType.java @@ -17,18 +17,17 @@ package org.springframework.cloud.function.context; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; -import java.util.Optional; +import java.util.ArrayList; +import java.util.List; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; -import org.reactivestreams.Publisher; - import org.springframework.core.ResolvableType; +import org.springframework.core.io.support.SpringFactoriesLoader; import org.springframework.messaging.Message; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; /** * @author Dave Syer @@ -39,6 +38,8 @@ public class FunctionType { public static FunctionType UNCLASSIFIED = new FunctionType(ResolvableType .forClassWithGenerics(Function.class, Object.class, Object.class).getType()); + private static List transformers; + final private Type type; final private Class inputType; @@ -101,8 +102,17 @@ public class FunctionType { if (type instanceof ParameterizedType) { type = ((ParameterizedType) type).getRawType(); } - return Publisher.class.equals(type) || Flux.class.equals(type) - || Mono.class.equals(type) || Optional.class.equals(type); + if (transformers == null) { + transformers = new ArrayList<>(); + transformers.addAll( + SpringFactoriesLoader.loadFactories(WrapperDetector.class, null)); + } + for (WrapperDetector transformer : transformers) { + if (transformer.isWrapper(type)) { + return true; + } + } + return false; } public static FunctionType of(Type function) { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/WrapperDetector.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/WrapperDetector.java new file mode 100644 index 000000000..419c94d4e --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/WrapperDetector.java @@ -0,0 +1,29 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context; + +import java.lang.reflect.Type; + +/** + * @author Dave Syer + * + */ +public interface WrapperDetector { + + boolean isWrapper(Type type); + +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index d0a031dbb..dcf7db156 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -59,7 +59,6 @@ import org.springframework.cloud.function.core.FluxConsumer; import org.springframework.cloud.function.core.FluxFunction; import org.springframework.cloud.function.core.FluxSupplier; import org.springframework.cloud.function.core.FunctionFactoryMetadata; -import org.springframework.cloud.function.core.FunctionFactoryUtils; import org.springframework.cloud.function.core.IsolatedConsumer; import org.springframework.cloud.function.core.IsolatedFunction; import org.springframework.cloud.function.core.IsolatedSupplier; @@ -396,18 +395,18 @@ public class ContextFunctionCatalogAutoConfiguration { return names; } - private void wrap(FunctionRegistration registration, String key) { + private void wrap(FunctionRegistration registration, String key) { Object target = registration.getTarget(); this.names.put(target, key); if (registration.getType() != null) { this.types.put(key, registration.getType()); } else { - findType(target); + registration.type(findType(target).getType()); } Class type; - target = target(target, key); - registration.target(target); + registration = transform(registration); + target = registration.getTarget(); if (target instanceof Supplier) { type = Supplier.class; for (String name : registration.getNames()) { @@ -437,6 +436,60 @@ public class ContextFunctionCatalogAutoConfiguration { } } + private FunctionRegistration transform(FunctionRegistration registration) { + return fluxify(isolated(registration)); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private FunctionRegistration fluxify(FunctionRegistration input) { + FunctionRegistration registration = (FunctionRegistration) input; + Object target = registration.getTarget(); + FunctionType type = registration.getType(); + boolean flux = hasFluxTypes(type); + if (!flux) { + if (target instanceof Supplier) { + target = new FluxSupplier((Supplier) target); + } + else if (target instanceof Function) { + target = new FluxFunction((Function) target); + } + else if (target instanceof Consumer) { + target = new FluxConsumer((Consumer) target); + } + registration.target(target); + } + return registration; + } + + private boolean hasFluxTypes(FunctionType type) { + return type.isWrapper(); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private FunctionRegistration isolated(FunctionRegistration input) { + FunctionRegistration registration = (FunctionRegistration) input; + Object target = registration.getTarget(); + boolean isolated = getClass().getClassLoader() != target.getClass() + .getClassLoader(); + if (target instanceof Supplier) { + if (isolated) { + target = new IsolatedSupplier((Supplier) target); + } + } + else if (target instanceof Function) { + if (isolated) { + target = new IsolatedFunction((Function) target); + } + } + else if (target instanceof Consumer) { + if (isolated) { + target = new IsolatedConsumer((Consumer) target); + } + } + registration.target(target); + return registration; + } + private String getQualifier(String key) { if (registry != null && registry.containsBeanDefinition(key)) { BeanDefinition beanDefinition = registry.getBeanDefinition(key); @@ -453,59 +506,6 @@ public class ContextFunctionCatalogAutoConfiguration { return key; } - @SuppressWarnings({ "unchecked", "rawtypes" }) - private Object target(Object target, String key) { - boolean isolated = getClass().getClassLoader() != target.getClass() - .getClassLoader(); - if (target instanceof Supplier) { - boolean flux = isFluxSupplier(key, (Supplier) target); - if (isolated) { - target = new IsolatedSupplier((Supplier) target); - } - if (!flux) { - target = new FluxSupplier((Supplier) target); - } - } - else if (target instanceof Function) { - boolean flux = isFluxFunction(key, (Function) target); - if (isolated) { - target = new IsolatedFunction((Function) target); - } - if (!flux) { - target = new FluxFunction((Function) target); - } - } - else if (target instanceof Consumer) { - boolean flux = isFluxConsumer(key, (Consumer) target); - if (isolated) { - target = new IsolatedConsumer((Consumer) target); - } - if (!flux) { - target = new FluxConsumer((Consumer) target); - } - } - return target; - } - - private boolean isFluxFunction(String name, Function function) { - boolean fluxTypes = this.hasFluxTypes(function); - return fluxTypes || FunctionFactoryUtils.isFluxFunction(function); - } - - private boolean isFluxConsumer(String name, Consumer consumer) { - boolean fluxTypes = this.hasFluxTypes(consumer); - return fluxTypes || FunctionFactoryUtils.isFluxConsumer(consumer); - } - - private boolean isFluxSupplier(String name, Supplier supplier) { - boolean fluxTypes = this.hasFluxTypes(supplier); - return fluxTypes || FunctionFactoryUtils.isFluxSupplier(supplier); - } - - private boolean hasFluxTypes(Object function) { - return findType(function).isWrapper(); - } - private FunctionType findType(String name, AbstractBeanDefinition definition) { Object source = definition.getSource(); FunctionType param = null; diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/FluxWrapperDetector.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/FluxWrapperDetector.java new file mode 100644 index 000000000..f92c3daba --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/FluxWrapperDetector.java @@ -0,0 +1,40 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.config; + +import java.lang.reflect.Type; + +import org.reactivestreams.Publisher; + +import org.springframework.cloud.function.context.WrapperDetector; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * @author Dave Syer + * + */ +public class FluxWrapperDetector implements WrapperDetector { + + @Override + public boolean isWrapper(Type type) { + return Publisher.class.equals(type) || Flux.class.equals(type) + || Mono.class.equals(type); + } + +} diff --git a/spring-cloud-function-context/src/main/resources/META-INF/spring.factories b/spring-cloud-function-context/src/main/resources/META-INF/spring.factories index f3a0792fc..54884cc50 100644 --- a/spring-cloud-function-context/src/main/resources/META-INF/spring.factories +++ b/spring-cloud-function-context/src/main/resources/META-INF/spring.factories @@ -1,2 +1,5 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ -org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration \ No newline at end of file +org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration + +org.springframework.cloud.function.context.WrapperDetector=\ +org.springframework.cloud.function.context.config.FluxWrapperDetector