diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java index c64f85e3c..c2e357925 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java @@ -21,6 +21,7 @@ import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.Arrays; import java.util.Set; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -94,6 +95,10 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp .addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Supplier.class))); registeredNames .addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(Consumer.class))); + registeredNames + .addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(BiFunction.class))); + registeredNames + .addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(BiConsumer.class))); registeredNames .addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(FunctionRegistration.class))); } @@ -143,8 +148,8 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp functionRegistration = this.applicationContext .getBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX, FunctionRegistration.class); } - else if (functionCandidate instanceof BiFunction) { - functionRegistration = this.registerMessagingBiFunction((BiFunction) functionCandidate, functionName); + else if (functionCandidate instanceof BiFunction || functionCandidate instanceof BiConsumer) { + functionRegistration = this.registerMessagingBiFunction(functionCandidate, functionName); } else { functionType = FunctionTypeUtils.discoverFunctionType(functionCandidate, functionName, this.applicationContext); @@ -172,7 +177,7 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp } @SuppressWarnings({ "rawtypes", "unchecked" }) - private FunctionRegistration registerMessagingBiFunction(BiFunction userFunction, String functionName) { + private FunctionRegistration registerMessagingBiFunction(Object userFunction, String functionName) { Type biFunctionType = FunctionContextUtils.findType(this.applicationContext.getBeanFactory(), functionName); Type inputType1 = Object.class; Type inputType2 = Object.class; @@ -195,7 +200,13 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp if (payload.getClass().getName().equals("org.springframework.kafka.support.KafkaNull")) { payload = null; } - return userFunction.apply(payload, ((Message) message).getHeaders()); + if (userFunction instanceof BiConsumer) { + ((BiConsumer) userFunction).accept(payload, ((Message) message).getHeaders()); + return null; + } + else { + return ((BiFunction) userFunction).apply(payload, ((Message) message).getHeaders()); + } }; return new FunctionRegistration<>(wrapperFunction, functionName).type(FunctionType.of(biFunctionWrapperType));