Function wrapping BiConsumer improvements

When a BiConsumer user function is provided, we wrap it inside a Function
when registering it in FunctionRegstration. FunctionInvocationWrapper sees
this as a Function and downstream clients (such as Spring Cloud Stream) does
not have visibility into the user function type from a FunctionInvocationWrapper
API perspective. This commit propagates the BiConsumer targer user function info
as part of FunctionInvocationWrapper API.

For more information, see: https://github.com/spring-cloud/spring-cloud-stream/issues/2670
Resolves #1016
This commit is contained in:
Soby Chacko
2023-03-23 18:32:51 -04:00
committed by Oleg Zhurakousky
parent 98e88e7314
commit 0ba011a903
4 changed files with 63 additions and 5 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -40,6 +40,7 @@ import org.springframework.util.CollectionUtils;
* @param <T> target type
* @author Dave Syer
* @author Oleg Zhurakousky
* @author Soby Chacko
*/
public class FunctionRegistration<T> implements BeanNameAware {
@@ -61,6 +62,15 @@ public class FunctionRegistration<T> implements BeanNameAware {
private Type type;
/**
* In certain cased, {@link FunctionRegistration} needs to know details
* about the user function. For example, when the user provides a BiConsumer,
* we wrap that in a regular Function. There are actions that a downstream client
* needs to take based on the type information of the wrapped function such as
* not creating an output binding when the wrapped type is a BiConsumer.
*/
private Object userFunction;
/**
* Creates instance of FunctionRegistration.
* @param target instance of {@link Supplier}, {@link Function} or {@link Consumer}
@@ -180,4 +190,12 @@ public class FunctionRegistration<T> implements BeanNameAware {
this.name(name);
}
}
public Object getUserFunction() {
return userFunction;
}
public void setUserFunction(Object userFunction) {
this.userFunction = userFunction;
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 the original author or authors.
* Copyright 2019-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -60,6 +60,7 @@ import org.springframework.util.StringUtils;
* Implementation of {@link FunctionRegistry} capable of discovering functions in {@link BeanFactory}.
*
* @author Oleg Zhurakousky
* @author Soby Chacko
*/
public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry implements ApplicationContextAware {
@@ -228,7 +229,9 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
}
};
return new FunctionRegistration<>(wrapperFunction, functionName).type(biFunctionWrapperType);
FunctionRegistration<Function> functionRegistration = new FunctionRegistration<>(wrapperFunction, functionName).type(biFunctionWrapperType);
functionRegistration.setUserFunction(userFunction);
return functionRegistration;
}
private Object discoverFunctionInBeanFactory(String functionName) {

View File

@@ -31,6 +31,7 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -267,6 +268,12 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
FunctionInvocationWrapper function = functionRegistration != null
? this.invocationWrapperInstance(functionName, functionRegistration.getTarget(), functionRegistration.getType())
: null;
if (functionRegistration != null) {
Object userFunction = functionRegistration.getUserFunction();
if (userFunction instanceof BiConsumer && function != null) {
function.setWrappedBiConsumer(true);
}
}
if (functionRegistration != null && functionRegistration.getProperties().containsKey("singleton")) {
try {
function.isSingleton = Boolean.parseBoolean(functionRegistration.getProperties().get("singleton"));
@@ -415,6 +422,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
*/
private Function<Object, Object> enhancer;
private boolean wrappedBiConsumer;
FunctionInvocationWrapper(String functionDefinition, Object target, Type inputType, Type outputType) {
this.target = target;
this.inputType = this.normalizeType(inputType);
@@ -432,6 +441,14 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
}
}
public boolean isWrappedBiConsumer() {
return wrappedBiConsumer;
}
public void setWrappedBiConsumer(boolean wrappedBiConsumer) {
this.wrappedBiConsumer = wrappedBiConsumer;
}
public boolean isSkipOutputConversion() {
return skipOutputConversion;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@ import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -77,7 +78,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Oleg Zhurakousky
*
* @author Soby Chacko
*/
public class SimpleFunctionRegistryTests {
@@ -574,6 +575,25 @@ public class SimpleFunctionRegistryTests {
assertThat(consumerDowncounter.get()).isZero();
}
@Test
void biConsumerUserFunctionTypeIsKnownInFunctionInvocationWrapper() {
BiConsumer<Object, Object> testBiConsumer = (a, b) -> { };
Function<Object, Object> wrappedFunction = x -> {
testBiConsumer.accept(null, null);
return null;
};
FunctionRegistration<Function<Object, Object>> registration = new FunctionRegistration<>(
wrappedFunction, "functionWrappingBiConsumer").type(Function.class);
registration.setUserFunction(testBiConsumer);
SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter,
new JacksonMapper(new ObjectMapper()));
catalog.register(registration);
FunctionInvocationWrapper functionInvocationWrapper = catalog.lookup("functionWrappingBiConsumer");
assertThat(functionInvocationWrapper.isWrappedBiConsumer()).isTrue();
}
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}