Ensure FunctionTypes get registered for composite functions
Without this change the type of a composed function in the InMemoryFunctionCatalog is always null. The key is to register the type at the same time as the function is registered. Also some format and javadoc fixes (cosmetic)
This commit is contained in:
@@ -50,10 +50,9 @@ import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Base implementation of {@link FunctionRegistry} which supports
|
||||
* function composition during lookups. For example if this registry contains
|
||||
* function 'a' and 'b' you can compose them into a single function by
|
||||
* simply piping two names together during the
|
||||
* Base implementation of {@link FunctionRegistry} which supports function composition
|
||||
* during lookups. For example if this registry contains function 'a' and 'b' you can
|
||||
* compose them into a single function by simply piping two names together during the
|
||||
* lookup {@code this.lookup(Function.class, "a|b")}.
|
||||
*
|
||||
* Comma ',' is also supported as composition delimiter (e.g., {@code "a,b"}).
|
||||
@@ -62,9 +61,8 @@ import org.springframework.util.StringUtils;
|
||||
* @since 2.1
|
||||
*
|
||||
*/
|
||||
public abstract class AbstractComposableFunctionRegistry
|
||||
implements FunctionRegistry, FunctionInspector,
|
||||
ApplicationEventPublisherAware, EnvironmentAware {
|
||||
public abstract class AbstractComposableFunctionRegistry implements FunctionRegistry,
|
||||
FunctionInspector, ApplicationEventPublisherAware, EnvironmentAware {
|
||||
|
||||
private final Map<String, Object> suppliers = new ConcurrentHashMap<>();
|
||||
|
||||
@@ -93,10 +91,12 @@ public abstract class AbstractComposableFunctionRegistry
|
||||
@Override
|
||||
public Set<String> getNames(Class<?> type) {
|
||||
if (type == null) {
|
||||
return new HashSet<String>(getSupplierNames()) {{
|
||||
addAll(getConsumerNames());
|
||||
addAll(getFunctionNames());
|
||||
} };
|
||||
return new HashSet<String>(getSupplierNames()) {
|
||||
{
|
||||
addAll(getConsumerNames());
|
||||
addAll(getFunctionNames());
|
||||
}
|
||||
};
|
||||
}
|
||||
if (Supplier.class.isAssignableFrom(type)) {
|
||||
return this.getSupplierNames();
|
||||
@@ -150,8 +150,10 @@ public abstract class AbstractComposableFunctionRegistry
|
||||
* The count of all Suppliers, Function and Consumers currently registered.
|
||||
* @return the count of all Suppliers, Function and Consumers currently registered.
|
||||
*/
|
||||
@Override
|
||||
public int size() {
|
||||
return getSupplierNames().size() + getFunctionNames().size() + getConsumerNames().size();
|
||||
return getSupplierNames().size() + getFunctionNames().size()
|
||||
+ getConsumerNames().size();
|
||||
}
|
||||
|
||||
public FunctionType getFunctionType(String name) {
|
||||
@@ -159,10 +161,9 @@ public abstract class AbstractComposableFunctionRegistry
|
||||
}
|
||||
|
||||
/**
|
||||
* A reverse lookup where one can determine the actual name of
|
||||
* the function reference.
|
||||
* @param function should be an instance of {@link Supplier},
|
||||
* {@link Function} or {@link Consumer};
|
||||
* A reverse lookup where one can determine the actual name of the function reference.
|
||||
* @param function should be an instance of {@link Supplier}, {@link Function} or
|
||||
* {@link Consumer};
|
||||
* @return the name of the function or null.
|
||||
*/
|
||||
public String lookupFunctionName(Object function) {
|
||||
@@ -227,9 +228,10 @@ public abstract class AbstractComposableFunctionRegistry
|
||||
if (functionType != null) {
|
||||
return functionType;
|
||||
}
|
||||
throw new IllegalStateException("Unless FunctionType is already available in FunctionRegistration, "
|
||||
+ "this operation must be overriden "
|
||||
+ "by the implementation of the FunctionRegistry.");
|
||||
throw new IllegalStateException(
|
||||
"Unless FunctionType is already available in FunctionRegistration, "
|
||||
+ "this operation must be overriden "
|
||||
+ "by the implementation of the FunctionRegistry.");
|
||||
}
|
||||
|
||||
protected void addSupplier(String name, Supplier<?> supplier) {
|
||||
@@ -275,7 +277,7 @@ public abstract class AbstractComposableFunctionRegistry
|
||||
return registration;
|
||||
}
|
||||
|
||||
private void addType(String name, FunctionType functionType) {
|
||||
protected void addType(String name, FunctionType functionType) {
|
||||
this.types.computeIfAbsent(name, str -> functionType);
|
||||
}
|
||||
|
||||
@@ -308,11 +310,13 @@ public abstract class AbstractComposableFunctionRegistry
|
||||
else {
|
||||
String[] stages = StringUtils.delimitedListToStringArray(name, "|");
|
||||
if (Stream.of(stages).allMatch(funcName -> contains(funcName))) {
|
||||
List<Object> composableFunctions = Stream.of(stages).map(funcName -> find(funcName))
|
||||
.collect(Collectors.toList());
|
||||
composedFunction = composableFunctions.stream().reduce((a, z) -> composeFunctions(a, z))
|
||||
List<Object> composableFunctions = Stream.of(stages)
|
||||
.map(funcName -> find(funcName)).collect(Collectors.toList());
|
||||
composedFunction = composableFunctions.stream()
|
||||
.reduce((a, z) -> composeFunctions(a, z))
|
||||
.orElseGet(() -> null);
|
||||
if (composedFunction != null && !this.types.containsKey(name) && this.types.containsKey(stages[0])
|
||||
if (composedFunction != null && !this.types.containsKey(name)
|
||||
&& this.types.containsKey(stages[0])
|
||||
&& this.types.containsKey(stages[stages.length - 1])) {
|
||||
FunctionType input = this.types.get(stages[0]);
|
||||
FunctionType output = this.types.get(stages[stages.length - 1]);
|
||||
@@ -339,17 +343,17 @@ public abstract class AbstractComposableFunctionRegistry
|
||||
}
|
||||
|
||||
private boolean contains(String name) {
|
||||
return getSupplierNames().contains(name)
|
||||
|| getFunctionNames().contains(name) || getConsumerNames().contains(name);
|
||||
return getSupplierNames().contains(name) || getFunctionNames().contains(name)
|
||||
|| getConsumerNames().contains(name);
|
||||
}
|
||||
|
||||
private Object find(String name) {
|
||||
Object result = suppliers.get(name);
|
||||
Object result = this.suppliers.get(name);
|
||||
if (result == null) {
|
||||
result = functions.get(name);
|
||||
result = this.functions.get(name);
|
||||
}
|
||||
if (result == null) {
|
||||
result = consumers.get(name);
|
||||
result = this.consumers.get(name);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
@@ -361,8 +365,8 @@ public abstract class AbstractComposableFunctionRegistry
|
||||
if (b instanceof FluxConsumer) {
|
||||
if (supplier instanceof FluxSupplier) {
|
||||
FluxConsumer<Object> fConsumer = ((FluxConsumer<Object>) b);
|
||||
return (Supplier<Mono<Void>>) () -> Mono
|
||||
.from(supplier.get().compose(v -> fConsumer.apply(supplier.get())));
|
||||
return (Supplier<Mono<Void>>) () -> Mono.from(
|
||||
supplier.get().compose(v -> fConsumer.apply(supplier.get())));
|
||||
}
|
||||
else {
|
||||
throw new IllegalStateException(
|
||||
@@ -383,13 +387,15 @@ public abstract class AbstractComposableFunctionRegistry
|
||||
return function1.andThen(function2);
|
||||
}
|
||||
else {
|
||||
throw new IllegalStateException("The provided function is finite (i.e., returns Mono<?>) "
|
||||
+ "therefore it can *only* be composed with compatible function (i.e., Function<Mono, Flux>");
|
||||
throw new IllegalStateException(
|
||||
"The provided function is finite (i.e., returns Mono<?>) "
|
||||
+ "therefore it can *only* be composed with compatible function (i.e., Function<Mono, Flux>");
|
||||
}
|
||||
}
|
||||
else if (function2 instanceof FluxToMonoFunction) {
|
||||
return new FluxToMonoFunction<Object, Object>(((Function<Flux<Object>, Flux<Object>>) a)
|
||||
.andThen(((FluxToMonoFunction<Object, Object>) b).getTarget()));
|
||||
return new FluxToMonoFunction<Object, Object>(
|
||||
((Function<Flux<Object>, Flux<Object>>) a).andThen(
|
||||
((FluxToMonoFunction<Object, Object>) b).getTarget()));
|
||||
}
|
||||
else {
|
||||
return function1.andThen(function2);
|
||||
@@ -401,8 +407,8 @@ public abstract class AbstractComposableFunctionRegistry
|
||||
return (Consumer<Object>) v -> consumer.accept(function.apply(v));
|
||||
}
|
||||
else {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("Could not compose %s and %s", a.getClass(), b.getClass()));
|
||||
throw new IllegalArgumentException(String
|
||||
.format("Could not compose %s and %s", a.getClass(), b.getClass()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -429,4 +435,5 @@ public abstract class AbstractComposableFunctionRegistry
|
||||
}
|
||||
return function;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -56,7 +56,7 @@ public class InMemoryFunctionCatalog extends AbstractComposableFunctionRegistry
|
||||
Assert.notEmpty(functionRegistration.getNames(),
|
||||
"'registration' must contain at least one name before it is registered in catalog.");
|
||||
// TODO should we just delegate to wrap(..)????
|
||||
//wrap(functionRegistration, functionRegistration.getNames().iterator().next());
|
||||
// wrap(functionRegistration, functionRegistration.getNames().iterator().next());
|
||||
Class<?> type = Object.class;
|
||||
if (functionRegistration.getTarget() instanceof Function) {
|
||||
type = Function.class;
|
||||
@@ -81,6 +81,7 @@ public class InMemoryFunctionCatalog extends AbstractComposableFunctionRegistry
|
||||
}
|
||||
|
||||
for (String name : functionRegistration.getNames()) {
|
||||
addType(name, functionRegistration.getType());
|
||||
if (functionRegistration.getTarget() instanceof Function) {
|
||||
this.addFunction(name, (Function<?, ?>) functionRegistration.getTarget());
|
||||
}
|
||||
|
||||
@@ -73,13 +73,9 @@ import org.springframework.util.StringUtils;
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnMissingBean(FunctionCatalog.class)
|
||||
@ComponentScan(
|
||||
basePackages = "${spring.cloud.function.scan.packages:functions}",
|
||||
includeFilters = @Filter(type = FilterType.ASSIGNABLE_TYPE,
|
||||
classes = {
|
||||
Supplier.class,
|
||||
Function.class,
|
||||
Consumer.class }))
|
||||
@ComponentScan(basePackages = "${spring.cloud.function.scan.packages:functions}", //
|
||||
includeFilters = @Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {
|
||||
Supplier.class, Function.class, Consumer.class }))
|
||||
public class ContextFunctionCatalogAutoConfiguration {
|
||||
|
||||
static final String PREFERRED_MAPPER_PROPERTY = "spring.http.converters.preferred-json-mapper";
|
||||
@@ -89,8 +85,9 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
return new BeanFactoryFunctionCatalog();
|
||||
}
|
||||
|
||||
protected static class BeanFactoryFunctionCatalog extends AbstractComposableFunctionRegistry
|
||||
implements InitializingBean, BeanFactoryAware {
|
||||
protected static class BeanFactoryFunctionCatalog
|
||||
extends AbstractComposableFunctionRegistry
|
||||
implements InitializingBean, BeanFactoryAware {
|
||||
|
||||
private ApplicationEventPublisher applicationEventPublisher;
|
||||
|
||||
@@ -100,13 +97,15 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
public FunctionRegistration<?> getRegistration(Object function) {
|
||||
String functionName = this.lookupFunctionName(function);
|
||||
if (StringUtils.hasText(functionName)) {
|
||||
FunctionRegistration<?> registration = new FunctionRegistration<Object>(function, functionName);
|
||||
FunctionRegistration<?> registration = new FunctionRegistration<Object>(
|
||||
function, functionName);
|
||||
FunctionType functionType = this.findType(registration);
|
||||
return registration.type(functionType.getType());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void register(FunctionRegistration<T> functionRegistration) {
|
||||
Assert.notEmpty(functionRegistration.getNames(),
|
||||
"'registration' must contain at least one name before it is registered in catalog.");
|
||||
@@ -120,13 +119,13 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
@Override
|
||||
@SuppressWarnings("rawtypes")
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
Map<String, Supplier> supplierBeans = beanFactory
|
||||
Map<String, Supplier> supplierBeans = this.beanFactory
|
||||
.getBeansOfType(Supplier.class);
|
||||
Map<String, Function> functionBeans = beanFactory
|
||||
Map<String, Function> functionBeans = this.beanFactory
|
||||
.getBeansOfType(Function.class);
|
||||
Map<String, Consumer> consumerBeans = beanFactory
|
||||
Map<String, Consumer> consumerBeans = this.beanFactory
|
||||
.getBeansOfType(Consumer.class);
|
||||
Map<String, FunctionRegistration> functionRegistrationBeans = beanFactory
|
||||
Map<String, FunctionRegistration> functionRegistrationBeans = this.beanFactory
|
||||
.getBeansOfType(FunctionRegistration.class);
|
||||
this.doMerge(functionRegistrationBeans, consumerBeans, supplierBeans,
|
||||
functionBeans);
|
||||
@@ -168,7 +167,8 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
|
||||
if (functionType == null) {
|
||||
functionType = functionByNameExist(name)
|
||||
? new FunctionType(functionRegistration.getTarget().getClass()) : new FunctionType(
|
||||
? new FunctionType(functionRegistration.getTarget().getClass())
|
||||
: new FunctionType(
|
||||
FunctionContextUtils.findType(name, this.beanFactory));
|
||||
}
|
||||
|
||||
@@ -177,6 +177,11 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
|
||||
// @checkstyle:off
|
||||
/**
|
||||
* @param initial a registration
|
||||
* @param consumers consumers to register
|
||||
* @param suppliers suppliers to register
|
||||
* @param functions functions to register
|
||||
* @return a new registration
|
||||
* @deprecated Was never intended for public use.
|
||||
*/
|
||||
@Deprecated
|
||||
@@ -253,6 +258,7 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
registrations.forEach(registration -> wrap(registration,
|
||||
targets.get(registration.getTarget())));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class PreferGsonOrMissingJacksonCondition extends AnyNestedCondition {
|
||||
@@ -289,10 +295,8 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
@Configuration
|
||||
@ConditionalOnClass(ObjectMapper.class)
|
||||
@ConditionalOnBean(ObjectMapper.class)
|
||||
@ConditionalOnProperty(
|
||||
name = ContextFunctionCatalogAutoConfiguration.PREFERRED_MAPPER_PROPERTY,
|
||||
havingValue = "jackson",
|
||||
matchIfMissing = true)
|
||||
@ConditionalOnProperty(name = ContextFunctionCatalogAutoConfiguration.PREFERRED_MAPPER_PROPERTY, //
|
||||
havingValue = "jackson", matchIfMissing = true)
|
||||
protected static class JacksonConfiguration {
|
||||
|
||||
@Bean
|
||||
|
||||
@@ -24,6 +24,8 @@ import reactor.core.publisher.Flux;
|
||||
import org.springframework.cloud.function.context.FunctionRegistration;
|
||||
import org.springframework.cloud.function.context.FunctionType;
|
||||
import org.springframework.cloud.function.core.FluxFunction;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@@ -63,17 +65,44 @@ public class InMemoryFunctionCatalogTests {
|
||||
@Test
|
||||
public void testFunctionComposition() {
|
||||
FunctionRegistration<UpperCase> upperCaseRegistration = new FunctionRegistration<>(
|
||||
new UpperCase(), "uppercase").type(FunctionType.of(UpperCase.class).getType());
|
||||
new UpperCase(), "uppercase")
|
||||
.type(FunctionType.of(UpperCase.class).getType());
|
||||
FunctionRegistration<Reverse> reverseRegistration = new FunctionRegistration<>(
|
||||
new Reverse(), "reverse").type(FunctionType.of(Reverse.class).getType());
|
||||
InMemoryFunctionCatalog catalog = new InMemoryFunctionCatalog();
|
||||
catalog.register(upperCaseRegistration);
|
||||
catalog.register(reverseRegistration);
|
||||
|
||||
Function<Flux<String>, Flux<String>> lookedUpFunction = catalog.lookup("uppercase|reverse");
|
||||
Function<Flux<String>, Flux<String>> lookedUpFunction = catalog
|
||||
.lookup("uppercase|reverse");
|
||||
assertThat(catalog.getFunctionType("uppercase|reverse").isMessage()).isFalse();
|
||||
|
||||
assertThat(lookedUpFunction).isNotNull();
|
||||
assertThat(lookedUpFunction.apply(Flux.just("star")).blockFirst()).isEqualTo("RATS");
|
||||
assertThat(lookedUpFunction.apply(Flux.just("star")).blockFirst())
|
||||
.isEqualTo("RATS");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFunctionCompositionWithMessages() {
|
||||
FunctionRegistration<UpperCaseMessage> upperCaseRegistration = new FunctionRegistration<>(
|
||||
new UpperCaseMessage(), "uppercase")
|
||||
.type(FunctionType.of(UpperCaseMessage.class).getType());
|
||||
// TODO: make this work with plain Reverse (not message)
|
||||
FunctionRegistration<ReverseMessage> reverseRegistration = new FunctionRegistration<>(
|
||||
new ReverseMessage(), "reverse")
|
||||
.type(FunctionType.of(ReverseMessage.class).getType());
|
||||
InMemoryFunctionCatalog catalog = new InMemoryFunctionCatalog();
|
||||
catalog.register(upperCaseRegistration);
|
||||
catalog.register(reverseRegistration);
|
||||
|
||||
Function<Flux<Message<String>>, Flux<Message<String>>> lookedUpFunction = catalog
|
||||
.lookup("uppercase|reverse");
|
||||
assertThat(catalog.getFunctionType("uppercase|reverse").isMessage()).isTrue();
|
||||
|
||||
assertThat(lookedUpFunction).isNotNull();
|
||||
assertThat(lookedUpFunction
|
||||
.apply(Flux.just(MessageBuilder.withPayload("star").build())).blockFirst()
|
||||
.getPayload()).isEqualTo("RATS");
|
||||
}
|
||||
|
||||
private static class UpperCase implements Function<String, String> {
|
||||
@@ -85,6 +114,17 @@ public class InMemoryFunctionCatalogTests {
|
||||
|
||||
}
|
||||
|
||||
private static class UpperCaseMessage
|
||||
implements Function<Message<String>, Message<String>> {
|
||||
|
||||
@Override
|
||||
public Message<String> apply(Message<String> t) {
|
||||
return MessageBuilder.withPayload(t.getPayload().toUpperCase())
|
||||
.copyHeaders(t.getHeaders()).build();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class Reverse implements Function<String, String> {
|
||||
|
||||
@Override
|
||||
@@ -94,6 +134,18 @@ public class InMemoryFunctionCatalogTests {
|
||||
|
||||
}
|
||||
|
||||
private static class ReverseMessage
|
||||
implements Function<Message<String>, Message<String>> {
|
||||
|
||||
@Override
|
||||
public Message<String> apply(Message<String> t) {
|
||||
return MessageBuilder
|
||||
.withPayload(new StringBuilder(t.getPayload()).reverse().toString())
|
||||
.copyHeaders(t.getHeaders()).build();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class TestFunction implements Function<Integer, String> {
|
||||
|
||||
@Override
|
||||
|
||||
@@ -34,6 +34,7 @@ import org.springframework.cloud.function.context.FunctionType;
|
||||
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.BeanFactoryFunctionCatalog;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Dave Syer
|
||||
*
|
||||
@@ -198,7 +199,7 @@ public class BeanFactoryFunctionCatalogTests {
|
||||
Consumer<String> c = x -> ref.set(x.toUpperCase());
|
||||
this.processor.register(new FunctionRegistration<>(c, "consumer"));
|
||||
Supplier<Mono<Void>> f = this.processor.lookup("supplier|consumer");
|
||||
((Mono<Void>) f.get()).block();
|
||||
f.get().block();
|
||||
assertThat(ref.get()).isEqualTo("HELLO");
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user