GH-884 Add initial support for BiConsumer

This commit is contained in:
Oleg Zhurakousky
2022-06-15 16:15:21 +02:00
parent 8789c7ebd7
commit 3e6a9ffb27

View File

@@ -21,6 +21,7 @@ import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -94,6 +95,10 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Supplier.class)));
registeredNames
.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Consumer.class)));
registeredNames
.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(BiFunction.class)));
registeredNames
.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(BiConsumer.class)));
registeredNames
.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(FunctionRegistration.class)));
}
@@ -143,8 +148,8 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
functionRegistration = this.applicationContext
.getBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX, FunctionRegistration.class);
}
else if (functionCandidate instanceof BiFunction) {
functionRegistration = this.registerMessagingBiFunction((BiFunction) functionCandidate, functionName);
else if (functionCandidate instanceof BiFunction || functionCandidate instanceof BiConsumer) {
functionRegistration = this.registerMessagingBiFunction(functionCandidate, functionName);
}
else {
functionType = FunctionTypeUtils.discoverFunctionType(functionCandidate, functionName, this.applicationContext);
@@ -172,7 +177,7 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private FunctionRegistration registerMessagingBiFunction(BiFunction userFunction, String functionName) {
private FunctionRegistration registerMessagingBiFunction(Object userFunction, String functionName) {
Type biFunctionType = FunctionContextUtils.findType(this.applicationContext.getBeanFactory(), functionName);
Type inputType1 = Object.class;
Type inputType2 = Object.class;
@@ -195,7 +200,13 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
if (payload.getClass().getName().equals("org.springframework.kafka.support.KafkaNull")) {
payload = null;
}
return userFunction.apply(payload, ((Message) message).getHeaders());
if (userFunction instanceof BiConsumer) {
((BiConsumer) userFunction).accept(payload, ((Message) message).getHeaders());
return null;
}
else {
return ((BiFunction) userFunction).apply(payload, ((Message) message).getHeaders());
}
};
return new FunctionRegistration<>(wrapperFunction, functionName).type(FunctionType.of(biFunctionWrapperType));