|
|
|
|
@@ -17,9 +17,11 @@
|
|
|
|
|
package org.springframework.cloud.function.context.catalog;
|
|
|
|
|
|
|
|
|
|
import java.lang.reflect.Method;
|
|
|
|
|
import java.lang.reflect.ParameterizedType;
|
|
|
|
|
import java.lang.reflect.Type;
|
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
import java.util.Set;
|
|
|
|
|
import java.util.function.BiFunction;
|
|
|
|
|
import java.util.function.Consumer;
|
|
|
|
|
import java.util.function.Function;
|
|
|
|
|
import java.util.function.Supplier;
|
|
|
|
|
@@ -34,11 +36,13 @@ import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
|
|
|
|
|
import org.springframework.cloud.function.context.FunctionProperties;
|
|
|
|
|
import org.springframework.cloud.function.context.FunctionRegistration;
|
|
|
|
|
import org.springframework.cloud.function.context.FunctionRegistry;
|
|
|
|
|
import org.springframework.cloud.function.context.config.FunctionContextUtils;
|
|
|
|
|
import org.springframework.cloud.function.core.FunctionInvocationHelper;
|
|
|
|
|
import org.springframework.cloud.function.json.JsonMapper;
|
|
|
|
|
import org.springframework.context.ApplicationContext;
|
|
|
|
|
import org.springframework.context.ApplicationContextAware;
|
|
|
|
|
import org.springframework.context.support.GenericApplicationContext;
|
|
|
|
|
import org.springframework.core.ResolvableType;
|
|
|
|
|
import org.springframework.core.convert.ConversionService;
|
|
|
|
|
import org.springframework.lang.Nullable;
|
|
|
|
|
import org.springframework.messaging.Message;
|
|
|
|
|
@@ -138,6 +142,9 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
|
|
|
|
|
functionRegistration = this.applicationContext
|
|
|
|
|
.getBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX, FunctionRegistration.class);
|
|
|
|
|
}
|
|
|
|
|
else if (functionCandidate instanceof BiFunction) {
|
|
|
|
|
functionRegistration = this.registerMessagingBiFunction((BiFunction) functionCandidate, functionName);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
functionType = FunctionTypeUtils.discoverFunctionType(functionCandidate, functionName, this.applicationContext);
|
|
|
|
|
}
|
|
|
|
|
@@ -163,6 +170,36 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
|
|
|
|
|
return (T) function;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings({ "rawtypes", "unchecked" })
|
|
|
|
|
private FunctionRegistration registerMessagingBiFunction(BiFunction userFunction, String functionName) {
|
|
|
|
|
Type biFunctionType = FunctionContextUtils.findType(this.applicationContext.getBeanFactory(), functionName);
|
|
|
|
|
Type inputType1 = Object.class;
|
|
|
|
|
Type inputType2 = Object.class;
|
|
|
|
|
if (biFunctionType instanceof ParameterizedType) {
|
|
|
|
|
inputType1 = ((ParameterizedType) biFunctionType).getActualTypeArguments()[0];
|
|
|
|
|
inputType2 = ((ParameterizedType) biFunctionType).getActualTypeArguments()[1];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!FunctionTypeUtils.isTypeMap(inputType2)) {
|
|
|
|
|
throw new UnsupportedOperationException("BiFunction's second argument must be assignable to Map, since BiFunction "
|
|
|
|
|
+ "represents parsed Message with first argument being payload and second headers. "
|
|
|
|
|
+ "Other signatures are not supported at the moment.");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ResolvableType messageType = ResolvableType.forClassWithGenerics(Message.class, ResolvableType.forType(inputType1));
|
|
|
|
|
Type biFunctionWrapperType = ResolvableType.forClassWithGenerics(Function.class, messageType, ResolvableType.forType(inputType2)).getType();
|
|
|
|
|
|
|
|
|
|
Function wrapperFunction = message -> {
|
|
|
|
|
Object payload = ((Message) message).getPayload();
|
|
|
|
|
if (payload.getClass().getName().equals("org.springframework.kafka.support.KafkaNull")) {
|
|
|
|
|
payload = null;
|
|
|
|
|
}
|
|
|
|
|
return userFunction.apply(payload, ((Message) message).getHeaders());
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
return new FunctionRegistration<>(wrapperFunction, functionName).type(biFunctionWrapperType);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Object discoverFunctionInBeanFactory(String functionName) {
|
|
|
|
|
Object functionCandidate = null;
|
|
|
|
|
if (this.applicationContext.containsBean(functionName)) {
|
|
|
|
|
|