renamed ApplicationContextFunctionCatalog

This commit is contained in:
markfisher
2017-01-13 10:45:07 -05:00
parent 5c79ff58a6
commit cc3bb8f645
3 changed files with 95 additions and 87 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context;
import java.lang.reflect.ParameterizedType;
@@ -47,40 +48,44 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
* @author Mark Fisher
*/
@Configuration
@ConditionalOnClass(ApplicationContextFunctionCatalog.class)
@ConditionalOnClass(InMemoryFunctionCatalog.class)
@ConditionalOnMissingBean(FunctionCatalog.class)
public class ContextFunctionCatalogAutoConfiguration {
@Autowired(required = false)
private Map<String, Function<?, ?>> functions = Collections.emptyMap();
@Autowired(required = false)
private Map<String, Consumer<?>> consumers = Collections.emptyMap();
@Autowired(required = false)
private Map<String, Supplier<?>> suppliers = Collections.emptyMap();
@Autowired(required = false)
private Map<String, Function<?, ?>> functions = Collections.emptyMap();
@Autowired(required = false)
private Map<String, Consumer<?>> consumers = Collections.emptyMap();
@Bean
public FunctionCatalog functionCatalog(ContextFunctionPostProcessor processor,
ObjectMapper mapper) {
return new ApplicationContextFunctionCatalog(
public FunctionCatalog functionCatalog(ContextFunctionPostProcessor processor, ObjectMapper mapper) {
return new InMemoryFunctionCatalog(
processor.wrapSuppliers(mapper, suppliers),
processor.wrapFunctions(mapper, functions),
processor.wrapConsumers(mapper, consumers),
processor.wrapSuppliers(mapper, suppliers));
processor.wrapConsumers(mapper, consumers));
}
@Component
public static class ContextFunctionPostProcessor
implements BeanFactoryPostProcessor, BeanDefinitionRegistryPostProcessor {
private Set<String> suppliers = new HashSet<>();
private Set<String> functions = new HashSet<>();
private Set<String> consumers = new HashSet<>();
private Set<String> suppliers = new HashSet<>();
private BeanDefinitionRegistry registry;
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry)
throws BeansException {
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) {
this.registry = registry;
}
@@ -100,22 +105,6 @@ public class ContextFunctionCatalogAutoConfiguration {
return result;
}
public Map<String, Consumer<?>> wrapConsumers(ObjectMapper mapper,
Map<String, Consumer<?>> consumers) {
Map<String, Consumer<?>> result = new HashMap<>();
for (String key : consumers.keySet()) {
if (this.consumers.contains(key)) {
@SuppressWarnings("unchecked")
Consumer<Flux<?>> consumer = (Consumer<Flux<?>>) consumers.get(key);
result.put(key, wrapConsumer(consumer, mapper, key));
}
else {
result.put(key, consumers.get(key));
}
}
return result;
}
public Map<String, Function<?, ?>> wrapFunctions(ObjectMapper mapper,
Map<String, Function<?, ?>> functions) {
Map<String, Function<?, ?>> result = new HashMap<>();
@@ -133,22 +122,48 @@ public class ContextFunctionCatalogAutoConfiguration {
return result;
}
public Map<String, Consumer<?>> wrapConsumers(ObjectMapper mapper,
Map<String, Consumer<?>> consumers) {
Map<String, Consumer<?>> result = new HashMap<>();
for (String key : consumers.keySet()) {
if (this.consumers.contains(key)) {
@SuppressWarnings("unchecked")
Consumer<Flux<?>> consumer = (Consumer<Flux<?>>) consumers.get(key);
result.put(key, wrapConsumer(consumer, mapper, key));
}
else {
result.put(key, consumers.get(key));
}
}
return result;
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory factory)
throws BeansException {
for (String name : factory.getBeanDefinitionNames()) {
if (isGenericFunction(factory, name)) {
this.functions.add(name);
}
else if (isGenericSupplier(factory, name)) {
if (isGenericSupplier(factory, name)) {
this.suppliers.add(name);
}
else if (isGenericFunction(factory, name)) {
this.functions.add(name);
}
else if (isGenericConsumer(factory, name)) {
this.consumers.add(name);
}
}
}
private boolean isGenericSupplier(ConfigurableListableBeanFactory factory,
String name) {
return factory.isTypeMatch(name,
ResolvableType.forClassWithGenerics(Supplier.class, Flux.class))
&& !factory.isTypeMatch(name,
ResolvableType.forClassWithGenerics(Supplier.class,
ResolvableType.forClassWithGenerics(Flux.class,
String.class)));
}
private boolean isGenericFunction(ConfigurableListableBeanFactory factory,
String name) {
return factory.isTypeMatch(name,
@@ -162,16 +177,6 @@ public class ContextFunctionCatalogAutoConfiguration {
String.class)));
}
private boolean isGenericSupplier(ConfigurableListableBeanFactory factory,
String name) {
return factory.isTypeMatch(name,
ResolvableType.forClassWithGenerics(Supplier.class, Flux.class))
&& !factory.isTypeMatch(name,
ResolvableType.forClassWithGenerics(Supplier.class,
ResolvableType.forClassWithGenerics(Flux.class,
String.class)));
}
private boolean isGenericConsumer(ConfigurableListableBeanFactory factory,
String name) {
return factory.isTypeMatch(name,
@@ -182,6 +187,13 @@ public class ContextFunctionCatalogAutoConfiguration {
String.class)));
}
private ProxySupplier wrapSupplier(Supplier<Flux<?>> supplier,
ObjectMapper mapper) {
ProxySupplier wrapped = new ProxySupplier(mapper);
wrapped.setDelegate(supplier);
return wrapped;
}
private ProxyFunction wrapFunction(Function<Flux<?>, Flux<?>> function,
ObjectMapper mapper, String name) {
ProxyFunction wrapped = new ProxyFunction(mapper);
@@ -190,13 +202,6 @@ public class ContextFunctionCatalogAutoConfiguration {
return wrapped;
}
private ProxySupplier wrapSupplier(Supplier<Flux<?>> supplier,
ObjectMapper mapper) {
ProxySupplier wrapped = new ProxySupplier(mapper);
wrapped.setDelegate(supplier);
return wrapped;
}
private ProxyConsumer wrapConsumer(Consumer<Flux<?>> consumer,
ObjectMapper mapper, String name) {
ProxyConsumer wrapped = new ProxyConsumer(mapper);
@@ -283,21 +288,6 @@ abstract class ProxyWrapper<T> {
}
class ProxyFunction extends ProxyWrapper<Function<Flux<?>, Flux<?>>>
implements Function<Flux<String>, Flux<String>> {
@Autowired
public ProxyFunction(ObjectMapper mapper) {
super(mapper);
}
@Override
public Flux<String> apply(Flux<String> input) {
return getDelegate().apply(input.map(this::fromJson)).map(this::toJson);
}
}
class ProxySupplier extends ProxyWrapper<Supplier<Flux<?>>>
implements Supplier<Flux<String>> {
@@ -310,7 +300,20 @@ class ProxySupplier extends ProxyWrapper<Supplier<Flux<?>>>
public Flux<String> get() {
return getDelegate().get().map(this::toJson);
}
}
class ProxyFunction extends ProxyWrapper<Function<Flux<?>, Flux<?>>>
implements Function<Flux<String>, Flux<String>> {
@Autowired
public ProxyFunction(ObjectMapper mapper) {
super(mapper);
}
@Override
public Flux<String> apply(Flux<String> input) {
return getDelegate().apply(input.map(this::fromJson)).map(this::toJson);
}
}
class ProxyConsumer extends ProxyWrapper<Consumer<Flux<?>>>
@@ -325,5 +328,4 @@ class ProxyConsumer extends ProxyWrapper<Consumer<Flux<?>>>
public void accept(Flux<String> input) {
getDelegate().accept(input.map(this::fromJson));
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context;
import java.util.Map;
@@ -22,35 +23,40 @@ import java.util.function.Supplier;
import org.springframework.cloud.function.registry.FunctionCatalog;
public class ApplicationContextFunctionCatalog implements FunctionCatalog {
/**
* @author Dave Syer
* @author Mark Fisher
*/
public class InMemoryFunctionCatalog implements FunctionCatalog {
private final Map<String, Function<?, ?>> functions;
private final Map<String, Consumer<?>> consumers;
private final Map<String, Supplier<?>> suppliers;
public ApplicationContextFunctionCatalog(Map<String, Function<?, ?>> functions,
Map<String, Consumer<?>> consumers, Map<String, Supplier<?>> suppliers) {
public InMemoryFunctionCatalog(Map<String, Supplier<?>> suppliers,
Map<String, Function<?, ?>> functions, Map<String, Consumer<?>> consumers) {
this.suppliers = suppliers;
this.functions = functions;
this.consumers = consumers;
this.suppliers = suppliers;
}
@SuppressWarnings("unchecked")
@Override
public <T> Consumer<T> lookupConsumer(String name) {
return (Consumer<T>) consumers.get(name);
}
@SuppressWarnings("unchecked")
@Override
public <T, R> Function<T, R> lookupFunction(String name) {
return (Function<T, R>) functions.get(name);
}
@SuppressWarnings("unchecked")
@Override
public <T> Supplier<T> lookupSupplier(String name) {
return (Supplier<T>) suppliers.get(name);
}
@Override
@SuppressWarnings("unchecked")
public <T, R> Function<T, R> lookupFunction(String name) {
return (Function<T, R>) functions.get(name);
}
@Override
@SuppressWarnings("unchecked")
public <T> Consumer<T> lookupConsumer(String name) {
return (Consumer<T>) consumers.get(name);
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,9 +25,9 @@ import java.util.function.Supplier;
*/
public interface FunctionCatalog {
<T> Consumer<T> lookupConsumer(String name);
<T> Supplier<T> lookupSupplier(String name);
<T, R> Function<T, R> lookupFunction(String name);
<T> Supplier<T> lookupSupplier(String name);
<T> Consumer<T> lookupConsumer(String name);
}