GH-413 Fix type discovery logic in BeanFactoryAwareFunctionRegistry
- added 'discoverFunctionTypeFromFunctionalObject' method to FunctionTypeUtils - added tests to reproduce and validate the issue
This commit is contained in:
@@ -219,23 +219,21 @@ public class BeanFactoryAwareFunctionRegistry
|
||||
*/
|
||||
List<String> names = Stream
|
||||
.concat(Stream.of(functionNames), Stream.concat(Stream.of(consumerNames), Stream.of(supplierNames))).collect(Collectors.toList());
|
||||
Object fnObject = null;
|
||||
|
||||
if (!ObjectUtils.isEmpty(names)) {
|
||||
Assert.isTrue(names.size() == 1, "Found more then one function in BeanFactory: " + names
|
||||
+ ". Consider providing 'spring.cloud.function.definition' property.");
|
||||
definition = names.get(0);
|
||||
fnObject = this.applicationContext.getBean(definition);
|
||||
}
|
||||
else {
|
||||
if (this.registrationsByName.size() > 0) {
|
||||
Assert.isTrue(this.registrationsByName.size() == 1, "Found more then one function in local registry");
|
||||
definition = this.registrationsByName.keySet().iterator().next();
|
||||
fnObject = this.registrationsByName.values().iterator().next().getTarget();
|
||||
}
|
||||
}
|
||||
|
||||
if (StringUtils.hasText(definition)) {
|
||||
Type functionType = discoverFunctionType(fnObject, definition);
|
||||
if (StringUtils.hasText(definition) && this.applicationContext.containsBean(definition)) {
|
||||
Type functionType = discoverFunctionType(this.applicationContext.getBean(definition), definition);
|
||||
if (!FunctionTypeUtils.isSupplier(functionType) && !FunctionTypeUtils.isFunction(functionType) && !FunctionTypeUtils.isConsumer(functionType)) {
|
||||
logger.info("Discovered functional instance of bean '" + definition + "' as a default function, however its "
|
||||
+ "function argument types can not be determined. Discarding.");
|
||||
|
||||
@@ -34,6 +34,7 @@ import org.reactivestreams.Publisher;
|
||||
import reactor.util.function.Tuple2;
|
||||
|
||||
import org.springframework.cloud.function.context.FunctionRegistration;
|
||||
import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper;
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.util.Assert;
|
||||
@@ -105,6 +106,15 @@ public final class FunctionTypeUtils {
|
||||
return methods.get(0);
|
||||
}
|
||||
|
||||
public static Type discoverFunctionTypeFromFunctionalObject(Object functionalObject) {
|
||||
if (functionalObject instanceof FunctionInvocationWrapper) {
|
||||
return ((FunctionInvocationWrapper) functionalObject).getFunctionType();
|
||||
}
|
||||
else {
|
||||
return discoverFunctionTypeFromClass(functionalObject.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
public static Type discoverFunctionTypeFromClass(Class<?> functionalClass) {
|
||||
Assert.isTrue(isFunctional(functionalClass), "Type must be one of Supplier, Function or Consumer");
|
||||
Type[] generics = functionalClass.getGenericInterfaces();
|
||||
|
||||
@@ -332,6 +332,44 @@ public class FunctionDeployerTests {
|
||||
assertThat(result2.get(1)).isEqualTo("2");
|
||||
}
|
||||
|
||||
// same as previous test, but lookup is empty
|
||||
@Test
|
||||
public void testBootJarWithMultipleInputOutputEmptyLookup() {
|
||||
String[] args = new String[] {
|
||||
"--spring.cloud.function.location=target/it/bootjar-multi/target/bootjar-multi-1.0.0.RELEASE-exec.jar",
|
||||
"--spring.cloud.function.function-class=function.example.Repeater"
|
||||
};
|
||||
ApplicationContext context = SpringApplication.run(DeployerApplication.class, args);
|
||||
FunctionCatalog catalog = context.getBean(FunctionCatalog.class);
|
||||
|
||||
Function<Tuple2<Flux<Message<byte[]>>, Flux<Message<byte[]>>>, Tuple2<Flux<Message<byte[]>>, Flux<Message<byte[]>>>> function =
|
||||
catalog.lookup("", "application/json", "application/json");
|
||||
|
||||
Message<byte[]> msg1 = MessageBuilder.withPayload("\"one\"".getBytes()).build();
|
||||
Message<byte[]> msg2 = MessageBuilder.withPayload("\"two\"".getBytes()).build();
|
||||
Flux<Message<byte[]>> inputOne = Flux.just(msg1, msg2);
|
||||
|
||||
Message<byte[]> msgInt1 = MessageBuilder.withPayload("\"1\"".getBytes()).build();
|
||||
Message<byte[]> msgInt2 = MessageBuilder.withPayload("\"2\"".getBytes()).build();
|
||||
Flux<Message<byte[]>> inputTwo = Flux.just(msgInt1, msgInt2);
|
||||
|
||||
Tuple2<Flux<Message<byte[]>>, Flux<Message<byte[]>>> result = function.apply(Tuples.of(inputOne, inputTwo));
|
||||
List<String> result1 = new ArrayList<>();
|
||||
List<String> result2 = new ArrayList<>();
|
||||
result.getT1().subscribe(message -> {
|
||||
result1.add(new String(message.getPayload()));
|
||||
});
|
||||
result.getT2().subscribe(message -> {
|
||||
result2.add(new String(message.getPayload()));
|
||||
});
|
||||
|
||||
assertThat(result1.get(0)).isEqualTo("\"one\"");
|
||||
assertThat(result1.get(1)).isEqualTo("\"two\"");
|
||||
|
||||
assertThat(result2.get(0)).isEqualTo("3");
|
||||
assertThat(result2.get(1)).isEqualTo("2");
|
||||
}
|
||||
|
||||
@SpringBootApplication(proxyBeanMethods = false)
|
||||
private static class DeployerApplication {
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user