@@ -17,9 +17,11 @@
|
||||
package org.springframework.cloud.function.context.catalog;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.ParameterizedType;
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.Arrays;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
@@ -34,11 +36,14 @@ import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
|
||||
import org.springframework.cloud.function.context.FunctionProperties;
|
||||
import org.springframework.cloud.function.context.FunctionRegistration;
|
||||
import org.springframework.cloud.function.context.FunctionRegistry;
|
||||
import org.springframework.cloud.function.context.FunctionType;
|
||||
import org.springframework.cloud.function.context.config.FunctionContextUtils;
|
||||
import org.springframework.cloud.function.core.FunctionInvocationHelper;
|
||||
import org.springframework.cloud.function.json.JsonMapper;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.convert.ConversionService;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.Message;
|
||||
@@ -138,6 +143,9 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
|
||||
functionRegistration = this.applicationContext
|
||||
.getBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX, FunctionRegistration.class);
|
||||
}
|
||||
else if (functionCandidate instanceof BiFunction) {
|
||||
functionRegistration = this.registerMessagingBiFunction((BiFunction) functionCandidate, functionName);
|
||||
}
|
||||
else {
|
||||
functionType = FunctionTypeUtils.discoverFunctionType(functionCandidate, functionName, this.applicationContext);
|
||||
}
|
||||
@@ -163,6 +171,36 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
|
||||
return (T) function;
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
private FunctionRegistration registerMessagingBiFunction(BiFunction userFunction, String functionName) {
|
||||
Type biFunctionType = FunctionContextUtils.findType(this.applicationContext.getBeanFactory(), functionName);
|
||||
Type inputType1 = Object.class;
|
||||
Type inputType2 = Object.class;
|
||||
if (biFunctionType instanceof ParameterizedType) {
|
||||
inputType1 = ((ParameterizedType) biFunctionType).getActualTypeArguments()[0];
|
||||
inputType2 = ((ParameterizedType) biFunctionType).getActualTypeArguments()[1];
|
||||
}
|
||||
|
||||
if (!FunctionTypeUtils.isTypeMap(inputType2)) {
|
||||
throw new UnsupportedOperationException("BiFunction's second argument must be assignable to Map, since BiFunction "
|
||||
+ "represents parsed Message with first argument being payload and second headers. "
|
||||
+ "Other signatures are not supported at the moment.");
|
||||
}
|
||||
|
||||
ResolvableType messageType = ResolvableType.forClassWithGenerics(Message.class, ResolvableType.forType(inputType1));
|
||||
Type biFunctionWrapperType = ResolvableType.forClassWithGenerics(Function.class, messageType, ResolvableType.forType(inputType2)).getType();
|
||||
|
||||
Function wrapperFunction = message -> {
|
||||
Object payload = ((Message) message).getPayload();
|
||||
if (payload.getClass().getName().equals("org.springframework.kafka.support.KafkaNull")) {
|
||||
payload = null;
|
||||
}
|
||||
return userFunction.apply(payload, ((Message) message).getHeaders());
|
||||
};
|
||||
|
||||
return new FunctionRegistration<>(wrapperFunction, functionName).type(FunctionType.of(biFunctionWrapperType));
|
||||
}
|
||||
|
||||
private Object discoverFunctionInBeanFactory(String functionName) {
|
||||
Object functionCandidate = null;
|
||||
if (this.applicationContext.containsBean(functionName)) {
|
||||
|
||||
@@ -88,6 +88,15 @@ public final class FunctionTypeUtils {
|
||||
return Collection.class.isAssignableFrom(rawType) || JsonNode.class.isAssignableFrom(rawType);
|
||||
}
|
||||
|
||||
public static boolean isTypeMap(Type type) {
|
||||
if (Map.class.isAssignableFrom(getRawType(type))) {
|
||||
return true;
|
||||
}
|
||||
type = getGenericType(type);
|
||||
Class<?> rawType = type instanceof ParameterizedType ? getRawType(type) : (Class<?>) type;
|
||||
return Map.class.isAssignableFrom(rawType);
|
||||
}
|
||||
|
||||
public static boolean isTypeArray(Type type) {
|
||||
return getRawType(type).isArray();
|
||||
}
|
||||
|
||||
@@ -89,7 +89,7 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter {
|
||||
for (Object item : iterablePayload) {
|
||||
boolean isConverted = false;
|
||||
if (item.getClass().getName().startsWith("org.springframework.kafka.support.KafkaNull")) {
|
||||
resultList.add(item);
|
||||
resultList.add(null);
|
||||
isConverted = true;
|
||||
}
|
||||
for (Iterator<MessageConverter> iterator = getConverters().iterator(); iterator.hasNext() && !isConverted;) {
|
||||
|
||||
@@ -34,6 +34,7 @@ import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
@@ -185,18 +186,19 @@ public class BeanFactoryAwareFunctionRegistryTests {
|
||||
assertThat(((FunctionInvocationWrapper) function).isComposed()).isTrue();
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Test
|
||||
public void testBiFunction() {
|
||||
FunctionCatalog catalog = this.configureCatalog();
|
||||
|
||||
Function biFunction = catalog.lookup("biFuncUpperCase");
|
||||
assertThat(biFunction.apply("hello")).isEqualTo("HELLO");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testImperativeFunction() {
|
||||
FunctionCatalog catalog = this.configureCatalog();
|
||||
|
||||
// Function<String, String> asIs = catalog.lookup("uppercase");
|
||||
// assertThat(asIs.apply("uppercase")).isEqualTo("UPPERCASE");
|
||||
//
|
||||
// Function<Flux<String>, Flux<String>> asFlux = catalog.lookup("uppercase");
|
||||
// List<String> result = asFlux.apply(Flux.just("uppercaseFlux", "uppercaseFlux2")).collectList().block();
|
||||
// assertThat(result.get(0)).isEqualTo("UPPERCASEFLUX");
|
||||
// assertThat(result.get(1)).isEqualTo("UPPERCASEFLUX2");
|
||||
|
||||
Function<Flux<Message<byte[]>>, Flux<Message<byte[]>>> messageFlux = catalog.lookup("uppercase", "application/json");
|
||||
Message<byte[]> message1 = MessageBuilder.withPayload("\"uppercaseFlux\"".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build();
|
||||
Message<byte[]> message2 = MessageBuilder.withPayload("\"uppercaseFlux2\"".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build();
|
||||
@@ -1052,6 +1054,13 @@ public class BeanFactoryAwareFunctionRegistryTests {
|
||||
return () -> "one";
|
||||
}
|
||||
|
||||
@Bean
|
||||
public BiFunction<String, Map, String> biFuncUpperCase() {
|
||||
return (p, h) -> {
|
||||
return p.toUpperCase();
|
||||
};
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Function<Map<String, Object>, Person> maptopojo() {
|
||||
return map -> {
|
||||
|
||||
Reference in New Issue
Block a user