Change FunctionCatalog to key off Class<?>

Makes it possible to support other "function" types in the future.
The user is always taking a risk with the lookup that the object
returned has the generic type desired (but that hasn't changed
with this commit). FunctionCatalog is a lot simpler as a result
and also a lot more flexible.
This commit is contained in:
Dave Syer
2018-02-27 10:22:44 +00:00
parent c11a4454ff
commit 33b33adb4b
21 changed files with 498 additions and 407 deletions

View File

@@ -15,6 +15,8 @@
*/
package org.springframework.cloud.function.stream.config;
import java.util.function.Supplier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
@@ -36,7 +38,7 @@ public class RouteRegistryAutoConfiguration {
@Bean
public RouteRegistry supplierRoutes(FunctionCatalog registry) {
return () -> registry.getSupplierNames();
return () -> registry.getNames(Supplier.class);
}
}

View File

@@ -88,7 +88,7 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
}
private Flux<Message<?>> function(String name, Flux<Message<?>> flux) {
Function<Object, Flux<?>> function = functionCatalog.lookupFunction(name);
Function<Object, Flux<?>> function = functionCatalog.lookup(Function.class, name);
return flux.publish(values -> {
Flux<?> result = function
.apply(values.map(message -> convertInput(function).apply(message)));
@@ -114,7 +114,7 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
}
private Flux<Message<?>> consumer(String name, Flux<Message<?>> flux) {
Consumer<Object> consumer = functionCatalog.lookupConsumer(name);
Consumer<Object> consumer = functionCatalog.lookup(Consumer.class, name);
consumer.accept(flux.map(message -> convertInput(consumer).apply(message))
.filter(transformed -> transformed != UNCONVERTED));
return Flux.empty();
@@ -125,7 +125,7 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
return Flux.empty();
}
String name = choose(names);
if (functionCatalog.lookupConsumer(name) != null) {
if (functionCatalog.lookup(Consumer.class, name) != null) {
return consumer(name, flux);
}
return function(name, flux);
@@ -149,8 +149,8 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
name = stash(defaultRoute);
}
if (name == null) {
Set<String> names = new LinkedHashSet<>(functionCatalog.getFunctionNames());
names.addAll(functionCatalog.getConsumerNames());
Set<String> names = new LinkedHashSet<>(functionCatalog.getNames(Function.class));
names.addAll(functionCatalog.getNames(Consumer.class));
List<String> matches = new ArrayList<>();
if (names.size() == 1) {
String key = names.iterator().next();
@@ -158,9 +158,9 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
}
else {
for (String candidate : names) {
Object function = functionCatalog.lookupFunction(candidate);
Object function = functionCatalog.lookup(Function.class, candidate);
if (function == null) {
function = functionCatalog.lookupConsumer(candidate);
function = functionCatalog.lookup(Consumer.class, candidate);
}
if (function == null) {
continue;
@@ -187,13 +187,13 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
}
private String stash(String key) {
if (functionCatalog.lookupFunction(key) != null) {
if (functionCatalog.lookup(Function.class, key) != null) {
if (!processors.containsKey(key)) {
processors.put(key, flux -> function(key, flux));
}
return key;
}
else if (functionCatalog.lookupConsumer(key) != null) {
else if (functionCatalog.lookup(Consumer.class, key) != null) {
if (!processors.containsKey(key)) {
processors.put(key, flux -> consumer(key, flux));
}

View File

@@ -51,7 +51,7 @@ public class SupplierInvokingMessageProducer<T> extends MessageProducerSupport {
@Override
protected void doStart() {
for (String name : functionCatalog.getSupplierNames()) {
for (String name : functionCatalog.getNames(Supplier.class)) {
start(name);
}
}
@@ -83,7 +83,7 @@ public class SupplierInvokingMessageProducer<T> extends MessageProducerSupport {
if (!disposables.containsKey(name)) {
synchronized (disposables) {
if (!disposables.containsKey(name)) {
Supplier<Flux<?>> supplier = functionCatalog.lookupSupplier(name);
Supplier<Flux<?>> supplier = functionCatalog.lookup(Supplier.class, name);
if (supplier != null) {
suppliers.add(name);
disposables.put(name,
@@ -96,7 +96,7 @@ public class SupplierInvokingMessageProducer<T> extends MessageProducerSupport {
}
private void send(String name, Object payload) {
Supplier<Flux<?>> supplier = functionCatalog.lookupSupplier(name);
Supplier<Flux<?>> supplier = functionCatalog.lookup(Supplier.class, name);
Message<?> message = MessageUtils.unpack(supplier, payload);
message = MessageBuilder.fromMessage(message)
.setHeaderIfAbsent(StreamConfigurationProperties.ROUTE_KEY, name).build();