Removed FunctionCatalog, BeanFactoryFunctionCatalogTests, ContextFunctionPostProcessorTests

Deprecated FluxWrapperDetector
Clean up and polishing of new BeanFactoryAwareFunctionRegistry
This commit is contained in:
Oleg Zhurakousky
2019-07-29 15:54:59 +02:00
parent cc5c522a8a
commit 90c69368c2
6 changed files with 63 additions and 831 deletions

View File

@@ -32,7 +32,6 @@ import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
@@ -44,7 +43,6 @@ import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.cloud.function.context.AbstractSpringFunctionAdapterInitializer;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionRegistration;
@@ -165,8 +163,33 @@ public class BeanFactoryAwareFunctionRegistry
return function;
}
private Type discoverFunctionType(Object function, String... names) {
boolean beanDefinitionExists = false;
for (int i = 0; i < names.length && !beanDefinitionExists; i++) {
beanDefinitionExists = this.applicationContext.getBeanFactory().containsBeanDefinition(names[i]);
}
return beanDefinitionExists
? FunctionType.of(FunctionContextUtils.findType(applicationContext.getBeanFactory(), names)).getType()
: new FunctionType(function.getClass()).getType();
}
private String discoverDefaultDefinitionIfNecessary(String definition) {
if (StringUtils.isEmpty(definition)) {
String[] functionNames = this.applicationContext.getBeanNamesForType(Function.class);
if (!ObjectUtils.isEmpty(functionNames)) {
Assert.isTrue(functionNames.length == 1, "Found more then one function in BeanFactory");
definition = functionNames[0];
}
}
return definition;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private Function<?, ?> compose(Class<?> type, String definition, String... acceptedOutputTypes) {
definition = discoverDefaultDefinitionIfNecessary(definition);
if (StringUtils.isEmpty(definition)) {
return null;
}
Function<?, ?> resultFunction = null;
if (this.registrationsByName.containsKey(definition)) {
Object targetFunction = this.registrationsByName.get(definition).getTarget();
@@ -174,65 +197,55 @@ public class BeanFactoryAwareFunctionRegistry
resultFunction = new FunctionInvocationWrapper(targetFunction, functionType, definition, acceptedOutputTypes);
}
else {
if (StringUtils.isEmpty(definition)) {
String[] functionNames = this.applicationContext.getBeanNamesForType(Function.class);
if (ObjectUtils.isEmpty(functionNames)) {
return null;
}
Assert.isTrue(functionNames.length == 1, "Found more then one function in BeanFactory");
definition = functionNames[0];
}
String[] names = StringUtils.delimitedListToStringArray(definition.replaceAll(",", "|").trim(), "|");
StringBuilder composedNameBuilder = new StringBuilder();
String prefix = "";
Type composedFunctionType = null;
Type originFunctionType = null;
for (String name : names) {
Object function = this.locateFunction(name);
if (function == null) {
return null;
}
if (composedFunctionType == null) {
composedFunctionType = beanDefinitionExists(name)
? FunctionType.of(FunctionContextUtils.findType(
(ConfigurableListableBeanFactory) applicationContext.getBeanFactory(), name)).getType()
: new FunctionType(function.getClass()).getType();
}
composedNameBuilder.append(prefix);
composedNameBuilder.append(name);
FunctionRegistration<Object> registration;
Type functionType = null;
Type currentFunctionType = null;
if (function instanceof FunctionRegistration) {
registration = (FunctionRegistration<Object>) function;
functionType = registration.getType().getType();
currentFunctionType = registration.getType().getType();
function = registration.getTarget();
}
else {
String[] aliasNames = this.getAliases(name).toArray(new String[] {});
functionType = beanDefinitionExists(aliasNames)
? FunctionType.of(FunctionContextUtils.findType(
(ConfigurableListableBeanFactory) applicationContext.getBeanFactory(), aliasNames)).getType()
: new FunctionType(function.getClass()).getType();
registration = new FunctionRegistration<>(function, name).type(functionType);
currentFunctionType = this.discoverFunctionType(function, aliasNames);
registration = new FunctionRegistration<>(function, name).type(currentFunctionType);
}
registrationsByFunction.putIfAbsent(function, registration);
registrationsByName.putIfAbsent(name, registration);
function = new FunctionInvocationWrapper(function, functionType, composedNameBuilder.toString(), acceptedOutputTypes);
function = new FunctionInvocationWrapper(function, currentFunctionType, name, acceptedOutputTypes);
if (originFunctionType == null) {
originFunctionType = currentFunctionType;
}
// composition
if (resultFunction == null) {
resultFunction = (Function<?, ?>) function;
}
else {
composedFunctionType = FunctionTypeUtils.compose(composedFunctionType, functionType);
originFunctionType = FunctionTypeUtils.compose(originFunctionType, currentFunctionType);
resultFunction = new FunctionInvocationWrapper(resultFunction.andThen((Function) function),
composedFunctionType, composedNameBuilder.toString(), acceptedOutputTypes);
registration = new FunctionRegistration<Object>(resultFunction, composedNameBuilder.toString())
.type(composedFunctionType);
registrationsByFunction.putIfAbsent(resultFunction, registration);
registrationsByName.putIfAbsent(composedNameBuilder.toString(), registration);
originFunctionType, composedNameBuilder.toString(), acceptedOutputTypes);
}
prefix = "|";
}
FunctionRegistration<Object> registration = new FunctionRegistration<Object>(resultFunction, definition)
.type(originFunctionType);
registrationsByFunction.putIfAbsent(resultFunction, registration);
registrationsByName.putIfAbsent(definition, registration);
}
return resultFunction;
}
@@ -263,15 +276,6 @@ public class BeanFactoryAwareFunctionRegistry
return key;
}
private boolean beanDefinitionExists(String... names) {
for (String name : names) {
if (this.applicationContext.getBeanFactory().containsBeanDefinition(name)) {
return true;
}
}
return false;
}
/**
* Single wrapper for all Suppliers, Functions and Consumers managed by this
* catalog.
@@ -285,7 +289,7 @@ public class BeanFactoryAwareFunctionRegistry
private final Type functionType;
private boolean composed;
private final boolean composed;
private final String[] acceptedOutputMimeTypes;
@@ -293,7 +297,6 @@ public class BeanFactoryAwareFunctionRegistry
FunctionInvocationWrapper(Object target, Type functionType, String functionDefinition, String... acceptedOutputMimeTypes) {
this.target = target;
this.composed = !target.getClass().getName().contains("EnhancerBySpringCGLIB") && target.getClass().getDeclaredFields().length > 1;
this.functionType = functionType;
this.acceptedOutputMimeTypes = acceptedOutputMimeTypes;
@@ -312,9 +315,9 @@ public class BeanFactoryAwareFunctionRegistry
@Override
public Object get() {
Object input = FunctionTypeUtils.isMono(functionType)
Object input = FunctionTypeUtils.isMono(this.functionType)
? Mono.empty()
: (FunctionTypeUtils.isMono(functionType) ? Flux.empty() : null);
: (FunctionTypeUtils.isMono(this.functionType) ? Flux.empty() : null);
return this.doApply(input, false);
}
@@ -337,7 +340,7 @@ public class BeanFactoryAwareFunctionRegistry
@SuppressWarnings({ "rawtypes", "unchecked" })
private Object invokeFunction(Object input) {
if (target instanceof FunctionInvocationWrapper || target instanceof Function) {
if (target instanceof Function) {
return ((Function) target).apply(input);
}
else if (target instanceof Supplier) {
@@ -371,8 +374,9 @@ public class BeanFactoryAwareFunctionRegistry
}
else {
if (FunctionTypeUtils.isConsumer(functionType)) {
result = input instanceof Mono ? Mono.from((Publisher) input).doOnNext((Consumer) this.target).then()
: Flux.from((Publisher) input).doOnNext((Consumer) this.target).then();
result = input instanceof Mono
? Mono.from((Publisher) input).doOnNext((Consumer) this.target).then()
: Flux.from((Publisher) input).doOnNext((Consumer) this.target).then();
}
else {
result = input instanceof Mono
@@ -422,18 +426,12 @@ public class BeanFactoryAwareFunctionRegistry
}
else {
List<MimeType> acceptedContentTypes = MimeTypeUtils.parseMimeTypes(acceptedOutputMimeTypes[0].toString());
for (MimeType acceptedContentType : acceptedContentTypes) {
try {
MessageHeaders headers = new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, acceptedContentType));
convertedValue = messageConverter.toMessage(value, headers);
if (convertedValue != null) {
break;
}
}
catch (Exception e) {
// ignore
}
}
convertedValue = acceptedContentTypes.stream()
.map(acceptedContentType -> messageConverter
.toMessage(value, new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, acceptedContentType))))
.filter(v -> v != null)
.findFirst().orElse(null);
}
return convertedValue;

View File

@@ -218,7 +218,9 @@ public final class FunctionTypeUtils {
: (ObjectUtils.isEmpty(resolvableComposedType.getGenerics())
? ResolvableType.forClass(Object.class) : resolvableComposedType.getGenerics()[1]);
originType = ResolvableType.forClassWithGenerics(Function.class, resolvableOriginType.getGenerics()[0], outType).getType();
originType = ResolvableType.forClassWithGenerics(Function.class,
ObjectUtils.isEmpty(resolvableOriginType.getGenerics()) ? resolvableOriginType : resolvableOriginType.getGenerics()[0],
outType).getType();
}
return originType;
}

View File

@@ -16,59 +16,36 @@
package org.springframework.cloud.function.context.config;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import javax.annotation.PreDestroy;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.cloud.function.context.catalog.AbstractComposableFunctionRegistry;
import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.catalog.FunctionUnregistrationEvent;
import org.springframework.cloud.function.json.GsonMapper;
import org.springframework.cloud.function.json.JacksonMapper;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.ComponentScan.Filter;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.FilterType;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.core.type.StandardMethodMetadata;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.ByteArrayMessageConverter;
import org.springframework.messaging.converter.CompositeMessageConverter;
@@ -127,144 +104,6 @@ public class ContextFunctionCatalogAutoConfiguration {
}
protected static class BeanFactoryFunctionCatalog extends AbstractComposableFunctionRegistry
implements SmartInitializingSingleton, BeanFactoryAware {
private ApplicationEventPublisher applicationEventPublisher;
private ConfigurableListableBeanFactory beanFactory;
/**
* Will collect all suppliers, functions, consumers and function registration as
* late as possible in the lifecycle.
*/
@SuppressWarnings("rawtypes")
@Override
public void afterSingletonsInstantiated() {
Map<String, Supplier> supplierBeans = this.beanFactory.getBeansOfType(Supplier.class);
Map<String, Function> functionBeans = this.beanFactory.getBeansOfType(Function.class);
Map<String, Consumer> consumerBeans = this.beanFactory.getBeansOfType(Consumer.class);
Map<String, FunctionRegistration> functionRegistrationBeans = this.beanFactory
.getBeansOfType(FunctionRegistration.class);
this.doMerge(functionRegistrationBeans, consumerBeans, supplierBeans, functionBeans);
}
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = (ConfigurableListableBeanFactory) beanFactory;
}
@PreDestroy
public void close() {
if (this.applicationEventPublisher != null) {
if (this.hasFunctions()) {
this.applicationEventPublisher.publishEvent(
new FunctionUnregistrationEvent(this, Function.class, this.getFunctionNames()));
}
if (this.hasSuppliers()) {
this.applicationEventPublisher.publishEvent(
new FunctionUnregistrationEvent(this, Supplier.class, this.getSupplierNames()));
}
}
}
@Override
protected FunctionType findType(FunctionRegistration<?> functionRegistration, String name) {
FunctionType functionType = super.findType(functionRegistration, name);
if (functionType == null) {
functionType = functionByNameExist(name) ? new FunctionType(functionRegistration.getTarget().getClass())
: this.findType(name);
}
return functionType;
}
private FunctionType findType(String name) {
Type type = FunctionContextUtils.findType(name, this.beanFactory);
return type == null ? null : new FunctionType(type);
}
// @checkstyle:off
/**
* @param initial a registration
* @param consumers consumers to register
* @param suppliers suppliers to register
* @param functions functions to register
* @return a new registration
* @deprecated Was never intended for public use.
*/
@Deprecated
@SuppressWarnings("rawtypes")
Set<FunctionRegistration<?>> merge(Map<String, FunctionRegistration> initial, Map<String, Consumer> consumers,
Map<String, Supplier> suppliers, Map<String, Function> functions) {
this.doMerge(initial, consumers, suppliers, functions);
return null;
}
// @checkstyle:on
private Collection<String> getAliases(String key) {
Collection<String> names = new LinkedHashSet<>();
String value = getQualifier(key);
if (value.equals(key) && this.beanFactory != null) {
names.addAll(Arrays.asList(this.beanFactory.getAliases(key)));
}
names.add(value);
return names;
}
private String getQualifier(String key) {
if (this.beanFactory != null && this.beanFactory.containsBeanDefinition(key)) {
BeanDefinition beanDefinition = this.beanFactory.getBeanDefinition(key);
Object source = beanDefinition.getSource();
if (source instanceof StandardMethodMetadata) {
StandardMethodMetadata metadata = (StandardMethodMetadata) source;
Qualifier qualifier = AnnotatedElementUtils.findMergedAnnotation(metadata.getIntrospectedMethod(),
Qualifier.class);
if (qualifier != null && qualifier.value().length() > 0) {
return qualifier.value();
}
}
}
return key;
}
private boolean functionByNameExist(String name) {
return name == null || this.beanFactory == null || !this.beanFactory.containsBeanDefinition(name);
}
@SuppressWarnings("rawtypes")
private void doMerge(Map<String, FunctionRegistration> functionRegistrationBeans,
Map<String, Consumer> consumerBeans, Map<String, Supplier> supplierBeans,
Map<String, Function> functionBeans) {
Set<FunctionRegistration<?>> registrations = new HashSet<>();
Map<Object, String> targets = new HashMap<>();
// Replace the initial registrations with new ones that have the right names
for (String key : functionRegistrationBeans.keySet()) {
FunctionRegistration<?> registration = functionRegistrationBeans.get(key);
if (registration.getNames().isEmpty()) {
registration.names(getAliases(key));
}
registrations.add(registration);
targets.put(registration.getTarget(), key);
}
Stream.concat(consumerBeans.entrySet().stream(),
Stream.concat(supplierBeans.entrySet().stream(), functionBeans.entrySet().stream()))
.forEach(entry -> {
if (!targets.containsKey(entry.getValue())) {
FunctionRegistration<Object> target = new FunctionRegistration<Object>(entry.getValue(),
getAliases(entry.getKey()).toArray(new String[] {}));
targets.put(target.getTarget(), entry.getKey());
registrations.add(target);
}
});
registrations.forEach(registration -> register(registration, targets.get(registration.getTarget())));
}
}
private static class PreferGsonOrMissingJacksonCondition extends AnyNestedCondition {
PreferGsonOrMissingJacksonCondition() {

View File

@@ -24,8 +24,9 @@ import org.springframework.cloud.function.context.WrapperDetector;
/**
* @author Dave Syer
*
* @deprecated as of 3.0. Not used by the framework
*/
@Deprecated
public class FluxWrapperDetector implements WrapperDetector {
@Override

View File

@@ -1,290 +0,0 @@
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context.config;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.BeanFactoryFunctionCatalog;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Dave Syer
*
*/
public class BeanFactoryFunctionCatalogTests {
private BeanFactoryFunctionCatalog processor = new BeanFactoryFunctionCatalog();
@Test
public void basicRegistrationFeatures() {
this.processor.register(new FunctionRegistration<>(new Foos(), "foos"));
Function<Flux<Integer>, Flux<String>> foos = this.processor.lookup(Function.class,
"foos");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("4");
}
@Test
public void lookupFunctionWithEmptyName() {
this.processor.register(new FunctionRegistration<>(new Foos(), "foos"));
Function<Flux<Integer>, Flux<String>> foos = this.processor.lookup(Function.class,
"");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("4");
}
@Test
public void lookupFunctionWithNoType() {
this.processor.register(new FunctionRegistration<>(new Foos(), "foos"));
Function<Flux<Integer>, Flux<String>> foos = this.processor.lookup("foos");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("4");
}
@Test
public void registerFunctionWithType() {
this.processor.register(new FunctionRegistration<Function<Integer, String>>(
(Integer i) -> "i=" + i, "foos").type(
FunctionType.from(Integer.class).to(String.class).getType()));
Function<Flux<Integer>, Flux<String>> foos = this.processor.lookup(Function.class,
"");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("i=2");
}
@Test
public void registerFunctionWithFluxType() {
this.processor
.register(new FunctionRegistration<Function<Flux<Integer>, Flux<String>>>(
ints -> ints.map(i -> "i=" + i), "foos")
.type(FunctionType.from(Integer.class).to(String.class)
.wrap(Flux.class).getType()));
Function<Flux<Integer>, Flux<String>> foos = this.processor.lookup(Function.class,
"");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("i=2");
}
@Test
public void registerFunctionWithMonoType() {
this.processor.register(
new FunctionRegistration<Function<Flux<String>, Mono<Map<String, Integer>>>>(
flux -> flux.collect(HashMap::new,
(map, word) -> map.merge(word, 1, Integer::sum)),
"foos").type(
FunctionType.from(String.class).to(Map.class)
.wrap(Flux.class, Mono.class).getType()));
Function<Flux<String>, Mono<Map<String, Integer>>> foos = this.processor
.lookup(Function.class, "");
assertThat(foos.apply(Flux.just("one", "one", "two")).block())
.containsEntry("one", 2);
}
@Test
public void lookupNonExistentConsumerWithEmptyName() {
this.processor.register(new FunctionRegistration<>(new Foos(), "foos"));
Consumer<Flux<String>> foos = this.processor.lookup(Consumer.class, "");
assertThat(foos).isNull();
}
@Test
public void composeFunction() {
this.processor.register(new FunctionRegistration<>(new Foos(), "foos"));
this.processor.register(new FunctionRegistration<>(new Bars(), "bars"));
Function<Flux<Integer>, Flux<String>> foos = this.processor.lookup(Function.class,
"foos,bars");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("Hello 4");
}
@Test
public void composeWithFiniteFunction() {
Function<String, String> func1 = x -> x.toUpperCase();
this.processor.register(new FunctionRegistration<>(func1, "func1"));
this.processor.register(
new FunctionRegistration<>(new FluxThenMonoFunction(), "func2"));
Function<Flux<String>, Mono<Long>> foos = this.processor.lookup(Function.class,
"func1,func2");
assertThat(foos.apply(Flux.fromArray(new String[] { "a", "b", "c" })).block())
.isEqualTo(3);
}
@Test
public void composeWithFiniteFunctionAndContinueWithCompatible() {
Function<String, String> func1 = x -> x.toUpperCase();
this.processor.register(new FunctionRegistration<>(func1, "func1"));
this.processor.register(
new FunctionRegistration<>(new FluxThenMonoFunction(), "func2"));
this.processor.register(
new FunctionRegistration<>(new MonoThenFluxFunction(), "func3"));
Function<Flux<String>, Flux<Integer>> foos = this.processor.lookup(Function.class,
"func1,func2,func3");
assertThat(foos.apply(Flux.fromArray(new String[] { "a", "b", "c" }))
.collectList().block().size()).isEqualTo(3);
}
@Test(expected = IllegalStateException.class)
public void composeIncompatibleFunctions() {
Function<String, String> func1 = x -> x.toUpperCase();
this.processor.register(new FunctionRegistration<>(func1, "func1"));
this.processor.register(
new FunctionRegistration<>(new FluxThenMonoFunction(), "func2"));
this.processor.lookup(Function.class, "func2,func1");
}
@Test
public void composeSupplier() {
this.processor.register(new FunctionRegistration<>(new Source(), "numbers"));
this.processor.register(new FunctionRegistration<>(new Foos(), "foos"));
Supplier<Flux<String>> foos = this.processor.lookup(Supplier.class,
"numbers,foos");
assertThat(foos.get().blockFirst()).isEqualTo("6");
}
@Test
public void composeUniqueSupplier() {
this.processor.register(new FunctionRegistration<>(new Source(), "numbers"));
Supplier<Flux<Integer>> foos = this.processor.lookup(Supplier.class, "");
assertThat(foos.get().blockFirst()).isEqualTo(3);
}
@Test
public void composeConsumer() {
this.processor.register(new FunctionRegistration<>(new Foos(), "foos"));
Sink sink = new Sink();
this.processor.register(new FunctionRegistration<>(sink, "sink"));
Function<Flux<Integer>, Mono<Void>> foos = this.processor.lookup(Function.class,
"foos,sink");
foos.apply(Flux.just(2)).subscribe();
assertThat(sink.values).contains("4");
}
@Test
public void composeUniqueConsumer() {
Sink sink = new Sink();
this.processor.register(new FunctionRegistration<>(sink, "sink"));
Function<Flux<String>, Mono<Void>> foos = this.processor.lookup(Function.class,
"");
foos.apply(Flux.just("2")).subscribe();
assertThat(sink.values).contains("2");
}
@Test
public void composeSupplierAndConsumer() {
AtomicReference<String> ref = new AtomicReference<String>();
Supplier<String> s = () -> "hello";
this.processor.register(new FunctionRegistration<>(s, "supplier"));
Consumer<String> c = x -> ref.set(x.toUpperCase());
this.processor.register(new FunctionRegistration<>(c, "consumer"));
Supplier<Mono<Void>> f = this.processor.lookup("supplier|consumer");
f.get().block();
assertThat(ref.get()).isEqualTo("HELLO");
}
@Test(expected = IllegalStateException.class)
public void failComposeSupplierWithMultipleConsumers() {
AtomicReference<String> ref = new AtomicReference<String>();
Supplier<String> s = () -> "hello";
this.processor.register(new FunctionRegistration<>(s, "supplier"));
Consumer<String> c = x -> ref.set(x.toUpperCase());
this.processor.register(new FunctionRegistration<>(c, "consumer"));
Consumer<String> z = x -> ref.set(x.toUpperCase());
this.processor.register(new FunctionRegistration<>(z, "z"));
this.processor.lookup("supplier|consumer|z");
}
@Test
public void composeSupplierAndMultipleFunctions() {
Supplier<String> s = () -> "hello";
this.processor.register(new FunctionRegistration<>(s, "supplier"));
Function<String, String> uppercase = x -> x.toUpperCase();
this.processor.register(new FunctionRegistration<>(uppercase, "uppercase"));
Function<String, String> concat = x -> x + x;
this.processor.register(new FunctionRegistration<>(concat, "concat"));
Supplier<Flux<String>> f = this.processor.lookup("supplier|uppercase|concat");
assertThat(f.get().blockFirst()).isEqualTo("HELLOHELLO");
}
protected static class Source implements Supplier<Integer> {
@Override
public Integer get() {
return 3;
}
}
protected static class Sink implements Consumer<String> {
private List<String> values = new ArrayList<>();
@Override
public void accept(String value) {
this.values.add(value);
}
}
protected static class Foos implements Function<Integer, String> {
@Override
public String apply(Integer t) {
return "" + 2 * t;
}
}
protected static class Bars implements Function<String, String> {
@Override
public String apply(String t) {
return "Hello " + t;
}
}
protected static class FluxThenMonoFunction
implements Function<Flux<String>, Mono<Long>> {
@Override
public Mono<Long> apply(Flux<String> t) {
return t.count();
}
}
protected static class MonoThenFluxFunction
implements Function<Mono<Long>, Flux<Integer>> {
@Override
public Flux<Integer> apply(Mono<Long> t) {
return Flux.range(0, Integer.parseInt(t.block().toString()));
}
}
}

View File

@@ -1,318 +0,0 @@
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context.config;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.StringTokenizer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.junit.After;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.beans.BeanUtils;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.BeanFactoryFunctionCatalog;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.ClassUtils;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Dave Syer
*
*/
// TODO: test all sorts of error conditions (duplicate registrations, incompatible types
// for functions with the same name, uncomposable combinations)
public class ContextFunctionPostProcessorTests {
private BeanFactoryFunctionCatalog processor = new BeanFactoryFunctionCatalog();
private URLClassLoader classLoader;
private ClassLoader contextClassLoader;
@After
public void close() throws Exception {
if (this.classLoader != null) {
this.classLoader.close();
}
if (Thread.currentThread().getContextClassLoader() != null) {
ClassUtils.overrideThreadContextClassLoader(this.contextClassLoader);
}
}
@Test
public void basicRegistrationFeatures() {
this.processor.register(new FunctionRegistration<>(new Foos(), "foos"));
@SuppressWarnings("unchecked")
Function<Flux<Integer>, Flux<String>> foos = (Function<Flux<Integer>, Flux<String>>) this.processor
.lookup(null, "foos"); //lookupFunction("foos");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("4");
}
@SuppressWarnings("deprecation")
@Test
public void registrationThroughMerge() {
FunctionRegistration<Foos> registration = new FunctionRegistration<>(new Foos(),
"foos");
this.processor.merge(Collections.singletonMap("foos", registration),
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
@SuppressWarnings("unchecked")
Function<Flux<Integer>, Flux<String>> foos = (Function<Flux<Integer>, Flux<String>>) this.processor
.lookup(null, "foos");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("4");
}
@SuppressWarnings("deprecation")
@Test
public void registrationThroughMergeFromNamedFunction() {
this.processor.merge(Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap(), Collections.singletonMap("foos", new Foos()));
@SuppressWarnings("unchecked")
Function<Flux<Integer>, Flux<String>> foos = (Function<Flux<Integer>, Flux<String>>) this.processor
.lookup(null, "foos");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("4");
}
@Test
public void composeWithComma() {
this.processor.register(new FunctionRegistration<>(new Foos(), "foos"));
this.processor.register(new FunctionRegistration<>(new Bars(), "bars"));
@SuppressWarnings("unchecked")
Function<Flux<Integer>, Flux<String>> foos = (Function<Flux<Integer>, Flux<String>>) this.processor
.lookup(null, "foos,bars");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("Hello 4");
assertThat(this.processor.getRegistration(foos).getNames())
.containsExactly("foos|bars");
}
@Test
public void supplierAndFunction() {
this.processor.register(
new FunctionRegistration<Supplier<String>>(() -> "foo", "supplier"));
this.processor.register(new FunctionRegistration<Function<String, String>>(
(x) -> x.toUpperCase(), "function"));
@SuppressWarnings("unchecked")
Supplier<Flux<String>> supplier = (Supplier<Flux<String>>) this.processor
.lookup(Supplier.class, "supplier|function");
assertThat(supplier.get().blockFirst()).isEqualTo("FOO");
assertThat(this.processor.getRegistration(supplier).getNames())
.containsExactly("supplier|function");
}
@SuppressWarnings("unchecked")
@Test
public void supplierAndConsumer() {
this.processor.register(
new FunctionRegistration<Supplier<String>>(() -> "foo", "supplier"));
this.processor.register(new FunctionRegistration<Consumer<String>>(
System.out::println, "consumer"));
Supplier<Mono<Void>> supplier = (Supplier<Mono<Void>>) this.processor
.lookup(Supplier.class, "supplier|consumer");
assertThat(supplier.get().block()).isNull();
}
@Test
public void compose() {
this.processor.register(new FunctionRegistration<>(new Foos(), "foos"));
this.processor.register(new FunctionRegistration<>(new Bars(), "bars"));
@SuppressWarnings("unchecked")
Function<Flux<Integer>, Flux<String>> foos = (Function<Flux<Integer>, Flux<String>>) this.processor
.lookup(null, "foos|bars");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("Hello 4");
assertThat(this.processor.getRegistration(foos).getNames())
.containsExactly("foos|bars");
}
@Test
public void composeWrapper() {
this.processor.register(new FunctionRegistration<>(new WrappedSource(), "ints"));
this.processor.register(new FunctionRegistration<>(new Foos(), "foos"));
@SuppressWarnings("unchecked")
Supplier<Flux<String>> foos = (Supplier<Flux<String>>) this.processor
.lookup(Supplier.class, "ints|foos");
assertThat(foos.get().blockFirst()).isEqualTo("8");
assertThat(this.processor.getRegistration(foos).getNames())
.containsExactly("ints|foos");
assertThat(this.processor.getRegistration(foos).getType().getOutputWrapper())
.isEqualTo(Flux.class);
}
@Test
public void isolatedFunction() {
this.contextClassLoader = ClassUtils
.overrideThreadContextClassLoader(getClass().getClassLoader());
this.processor.register(new FunctionRegistration<>(create(Foos.class), "foos"));
@SuppressWarnings("unchecked")
Function<Flux<Integer>, Flux<String>> foos = (Function<Flux<Integer>, Flux<String>>) this.processor
.lookup(null, "foos");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("4");
}
@Test
public void isolatedSupplier() {
this.contextClassLoader = ClassUtils
.overrideThreadContextClassLoader(getClass().getClassLoader());
this.processor
.register(new FunctionRegistration<>(create(Source.class), "source"));
@SuppressWarnings("unchecked")
Supplier<Flux<Integer>> source = (Supplier<Flux<Integer>>) this.processor
.lookup(Supplier.class, "source");
assertThat(source.get().blockFirst()).isEqualTo(4);
}
@Test
public void isolatedConsumer() {
this.contextClassLoader = ClassUtils
.overrideThreadContextClassLoader(getClass().getClassLoader());
Object target = create(Sink.class);
this.processor.register(new FunctionRegistration<>(target, "sink"));
@SuppressWarnings("unchecked")
Function<Flux<String>, Mono<Void>> sink = (Function<Flux<String>, Mono<Void>>) this.processor
.lookup(null, "sink");
sink.apply(Flux.just("Hello")).subscribe();
@SuppressWarnings("unchecked")
List<String> values = (List<String>) ReflectionTestUtils.getField(target,
"values");
assertThat(values).contains("Hello");
}
private Object create(Class<?> type) {
// Want to load these the test types in a disposable classloader:
List<URL> urls = new ArrayList<>();
String jcp = System.getProperty("java.class.path");
StringTokenizer jcpEntries = new StringTokenizer(jcp, File.pathSeparator);
while (jcpEntries.hasMoreTokens()) {
String pathEntry = jcpEntries.nextToken();
try {
urls.add(new File(pathEntry).toURI().toURL());
}
catch (MalformedURLException e) {
}
}
this.classLoader = new URLClassLoader(urls.toArray(new URL[0]),
getClass().getClassLoader().getParent());
return BeanUtils.instantiateClass(
ClassUtils.resolveClassName(type.getName(), this.classLoader));
}
public static class Foos implements Function<Integer, String> {
@Override
public String apply(Integer t) {
assertThat(ClassUtils.resolveClassName(Bar.class.getName(), null)
.getClassLoader()).isEqualTo(getClass().getClassLoader());
return "" + 2 * t;
}
}
public static class Bars implements Function<String, String> {
@Override
public String apply(String t) {
assertThat(ClassUtils.resolveClassName(Bar.class.getName(), null)
.getClassLoader()).isEqualTo(getClass().getClassLoader());
return "Hello " + t;
}
}
public static class Sink implements Consumer<String> {
private List<String> values = new ArrayList<>();
@Override
public void accept(String t) {
assertThat(ClassUtils.resolveClassName(Bar.class.getName(), null)
.getClassLoader()).isEqualTo(getClass().getClassLoader());
this.values.add(t);
}
}
public static class Source implements Supplier<Integer> {
@Override
public Integer get() {
return 4;
}
}
public static class WrappedSource implements Supplier<Flux<Integer>> {
@Override
public Flux<Integer> get() {
return Flux.just(4);
}
}
public static class Foo {
private String value;
public Foo(String value) {
this.value = value;
}
Foo() {
}
public String getValue() {
return this.value;
}
public void setValue(String value) {
this.value = value;
}
}
public static class Bar {
private String message;
public Bar(String value) {
this.message = value;
}
Bar() {
}
public String getMessage() {
return this.message;
}
public void setMessage(String message) {
this.message = message;
}
}
}