diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/InMemoryFunctionCatalog.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/InMemoryFunctionCatalog.java index 3c6382d1d..c143dd0b6 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/InMemoryFunctionCatalog.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/InMemoryFunctionCatalog.java @@ -94,4 +94,20 @@ public class InMemoryFunctionCatalog implements FunctionCatalog { public Consumer lookupConsumer(String name) { return (Consumer) consumers.get(name); } + + @Override + public Set getSupplierNames() { + return suppliers.keySet(); + } + + @Override + public Set getFunctionNames() { + return functions.keySet(); + } + + @Override + public Set getConsumerNames() { + return consumers.keySet(); + } + } diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FunctionCatalog.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FunctionCatalog.java index 8b0ed5aff..7816ff5c3 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FunctionCatalog.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FunctionCatalog.java @@ -16,6 +16,8 @@ package org.springframework.cloud.function.registry; +import java.util.Collections; +import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -30,4 +32,10 @@ public interface FunctionCatalog { Function lookupFunction(String name); Consumer lookupConsumer(String name); + + default Set getSupplierNames() { return Collections.emptySet(); } + + default Set getFunctionNames() { return Collections.emptySet(); } + + default Set getConsumerNames() { return Collections.emptySet(); } } diff --git a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionExtractingFunctionCatalog.java b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionExtractingFunctionCatalog.java index 8c05912ab..f0f0f42fc 100644 --- a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionExtractingFunctionCatalog.java +++ b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionExtractingFunctionCatalog.java @@ -66,6 +66,24 @@ public class FunctionExtractingFunctionCatalog return (Supplier) lookup(name, "lookupSupplier"); } + @SuppressWarnings("unchecked") + @Override + public Set getSupplierNames() { + return (Set) catalog("getSupplierNames"); + } + + @SuppressWarnings("unchecked") + @Override + public Set getFunctionNames() { + return (Set) catalog("getFunctionNames"); + } + + @SuppressWarnings("unchecked") + @Override + public Set getConsumerNames() { + return (Set) catalog("getConsumerNames"); + } + @Override public boolean isMessage(String name) { return (Boolean) inspect(name, "isMessage"); @@ -126,7 +144,14 @@ public class FunctionExtractingFunctionCatalog return invoke(FunctionCatalog.class, method, name); } - private Object invoke(Class type, String method, Object arg) { + private Object catalog(String method) { + if (logger.isDebugEnabled()) { + logger.debug("Calling " + method); + } + return invoke(FunctionCatalog.class, method); + } + + private Object invoke(Class type, String method, Object... arg) { for (String id : deployed) { Object catalog = deployer.getBean(id, type); if (catalog == null) { @@ -136,7 +161,7 @@ public class FunctionExtractingFunctionCatalog MethodInvoker invoker = new MethodInvoker(); invoker.setTargetObject(catalog); invoker.setTargetMethod(method); - invoker.setArguments(new Object[] { arg }); + invoker.setArguments(arg); invoker.prepare(); Object result = invoker.invoke(); if (result != null) { diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java index 3dabb9732..1715cda98 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java @@ -25,7 +25,6 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; -import org.springframework.beans.factory.ListableBeanFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionOutcome; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; @@ -63,10 +62,8 @@ public class StreamConfiguration { @Bean public SupplierInvokingMessageProducer supplierInvoker( - ListableBeanFactory beanFactory, FunctionCatalog registry) { - String[] names = beanFactory.getBeanNamesForType(Supplier.class, false, - false); - return new SupplierInvokingMessageProducer(registry, names); + FunctionCatalog registry) { + return new SupplierInvokingMessageProducer(registry); } } @@ -78,14 +75,11 @@ public class StreamConfiguration { private StreamConfigurationProperties properties; @Bean - public StreamListeningFunctionInvoker functionInvoker( - ListableBeanFactory beanFactory, FunctionCatalog registry, + public StreamListeningFunctionInvoker functionInvoker(FunctionCatalog registry, FunctionInspector functionInspector, @Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) { - String[] names = beanFactory.getBeanNamesForType(Function.class, false, - false); return new StreamListeningFunctionInvoker(registry, functionInspector, - compositeMessageConverterFactory, properties.getEndpoint(), names); + compositeMessageConverterFactory, properties.getEndpoint()); } } @@ -97,14 +91,11 @@ public class StreamConfiguration { private StreamConfigurationProperties properties; @Bean - public StreamListeningConsumerInvoker consumerInvoker( - ListableBeanFactory beanFactory, FunctionCatalog registry, + public StreamListeningConsumerInvoker consumerInvoker(FunctionCatalog registry, FunctionInspector functionInspector, @Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) { - String[] names = beanFactory.getBeanNamesForType(Consumer.class, false, - false); return new StreamListeningConsumerInvoker(registry, functionInspector, - compositeMessageConverterFactory, properties.getEndpoint(), names); + compositeMessageConverterFactory, properties.getEndpoint()); } } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java index 59bc8e86e..7733bd51b 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java @@ -16,6 +16,7 @@ package org.springframework.cloud.function.stream; +import java.util.Set; import java.util.function.Function; import org.springframework.beans.factory.SmartInitializingSingleton; @@ -46,19 +47,15 @@ public class StreamListeningConsumerInvoker implements SmartInitializingSingleto private final String defaultEndpoint; - private final String[] names; - private static final String NOENDPOINT = "__NOENDPOINT__"; public StreamListeningConsumerInvoker(FunctionCatalog functionCatalog, FunctionInspector functionInspector, - CompositeMessageConverterFactory converterFactory, String defaultEndpoint, - String... names) { + CompositeMessageConverterFactory converterFactory, String defaultEndpoint) { this.functionCatalog = functionCatalog; this.functionInspector = functionInspector; this.converterFactory = converterFactory; this.defaultEndpoint = defaultEndpoint; - this.names = names; } @Override @@ -81,8 +78,9 @@ public class StreamListeningConsumerInvoker implements SmartInitializingSingleto private String select(Message input) { String name = defaultEndpoint; if (name == null) { - if (names.length == 1) { - name = names[0]; + Set names = functionCatalog.getConsumerNames(); + if (names.size() == 1) { + name = names.iterator().next(); } else { for (String candidate : names) { diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java index 2c1352c2b..7cb2fb903 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java @@ -16,6 +16,7 @@ package org.springframework.cloud.function.stream; +import java.util.Set; import java.util.function.Function; import org.springframework.beans.factory.SmartInitializingSingleton; @@ -48,19 +49,15 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto private final String defaultEndpoint; - private final String[] names; - private static final String NOENDPOINT = "__NOENDPOINT__"; public StreamListeningFunctionInvoker(FunctionCatalog functionCatalog, FunctionInspector functionInspector, - CompositeMessageConverterFactory converterFactory, String defaultEndpoint, - String... names) { + CompositeMessageConverterFactory converterFactory, String defaultEndpoint) { this.functionCatalog = functionCatalog; this.functionInspector = functionInspector; this.converterFactory = converterFactory; this.defaultEndpoint = defaultEndpoint; - this.names = names; } @Override @@ -84,8 +81,9 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto private String select(Message input) { String name = defaultEndpoint; if (name == null) { - if (names.length == 1) { - name = names[0]; + Set names = functionCatalog.getFunctionNames(); + if (names.size() == 1) { + name = names.iterator().next(); } else { for (String candidate : names) { diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java index 8ff92e03e..c6ceef0a7 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java @@ -33,11 +33,8 @@ public class SupplierInvokingMessageProducer extends MessageProducerSupport { private final FunctionCatalog functionCatalog; - private final String[] names; - - public SupplierInvokingMessageProducer(FunctionCatalog registry, String... names) { + public SupplierInvokingMessageProducer(FunctionCatalog registry) { this.functionCatalog = registry; - this.names = names; this.setOutputChannelName(Source.OUTPUT); } @@ -50,7 +47,7 @@ public class SupplierInvokingMessageProducer extends MessageProducerSupport { private Flux supplier() { Supplier> supplier = null; Flux result = Flux.empty(); - for (String name : names) { + for (String name : functionCatalog.getSupplierNames()) { supplier = functionCatalog.lookupSupplier(name); Assert.notNull(supplier, "Supplier must not be null"); result = Flux.merge(result, supplier.get());