Add support for Message<Foo> in stream apps

The FunctionInspector needs to be able to distinguish between a
function of Flux<Foo> and a function of Flux<Message<Foo>>. Then
it can do the conversion a level below.

Only supports Message->Message or POJO->POJO (no mixtures), so there
is only one new method in FunctionInspector.

No support in the web endpoints yet. But it's probably not so hard
to add.
This commit is contained in:
Dave Syer
2017-06-15 12:32:46 +01:00
parent 797936fd0c
commit 0756dc3394
9 changed files with 307 additions and 21 deletions

View File

@@ -30,7 +30,8 @@ import org.springframework.cloud.function.context.FunctionInspector;
import org.springframework.cloud.function.registry.FunctionCatalog;
import org.springframework.util.MethodInvoker;
public class FunctionExtractingFunctionCatalog implements FunctionCatalog, FunctionInspector {
public class FunctionExtractingFunctionCatalog
implements FunctionCatalog, FunctionInspector {
private static Log logger = LogFactory
.getLog(FunctionExtractingFunctionCatalog.class);
@@ -65,6 +66,11 @@ public class FunctionExtractingFunctionCatalog implements FunctionCatalog, Funct
return (Supplier<T>) lookup(name, "lookupSupplier");
}
@Override
public boolean isMessage(String name) {
return (Boolean) inspect(name, "isMessage");
}
@Override
public Class<?> getInputType(String name) {
return (Class<?>) inspect(name, "getInputType");
@@ -112,14 +118,14 @@ public class FunctionExtractingFunctionCatalog implements FunctionCatalog, Funct
}
return invoke(FunctionInspector.class, method, arg);
}
private Object lookup(String name, String method) {
if (logger.isDebugEnabled()) {
logger.debug("Looking up " + name + " with " + method);
}
return invoke(FunctionCatalog.class, method, name);
}
private Object invoke(Class<?> type, String method, Object arg) {
for (String id : deployed) {
Object catalog = deployer.getBean(id, type);