From 39e4bed41212bfd764674b27dc66ae46b508e686 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Tue, 14 Jun 2022 09:08:31 +0200 Subject: [PATCH] GH-884 Add initial support for BiFunction Resolves #884 polish --- .../BeanFactoryAwareFunctionRegistry.java | 37 +++++++++++++++++++ .../context/catalog/FunctionTypeUtils.java | 9 +++++ .../SmartCompositeMessageConverter.java | 2 +- ...BeanFactoryAwareFunctionRegistryTests.java | 25 +++++++++---- 4 files changed, 64 insertions(+), 9 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java index e8f7c8cfa..ffd90baad 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java @@ -17,9 +17,11 @@ package org.springframework.cloud.function.context.catalog; import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.Arrays; import java.util.Set; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -34,11 +36,13 @@ import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils; import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionRegistry; +import org.springframework.cloud.function.context.config.FunctionContextUtils; import org.springframework.cloud.function.core.FunctionInvocationHelper; import org.springframework.cloud.function.json.JsonMapper; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.support.GenericApplicationContext; +import org.springframework.core.ResolvableType; import org.springframework.core.convert.ConversionService; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; @@ -138,6 +142,9 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp functionRegistration = this.applicationContext .getBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX, FunctionRegistration.class); } + else if (functionCandidate instanceof BiFunction) { + functionRegistration = this.registerMessagingBiFunction((BiFunction) functionCandidate, functionName); + } else { functionType = FunctionTypeUtils.discoverFunctionType(functionCandidate, functionName, this.applicationContext); } @@ -163,6 +170,36 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp return (T) function; } + @SuppressWarnings({ "rawtypes", "unchecked" }) + private FunctionRegistration registerMessagingBiFunction(BiFunction userFunction, String functionName) { + Type biFunctionType = FunctionContextUtils.findType(this.applicationContext.getBeanFactory(), functionName); + Type inputType1 = Object.class; + Type inputType2 = Object.class; + if (biFunctionType instanceof ParameterizedType) { + inputType1 = ((ParameterizedType) biFunctionType).getActualTypeArguments()[0]; + inputType2 = ((ParameterizedType) biFunctionType).getActualTypeArguments()[1]; + } + + if (!FunctionTypeUtils.isTypeMap(inputType2)) { + throw new UnsupportedOperationException("BiFunction's second argument must be assignable to Map, since BiFunction " + + "represents parsed Message with first argument being payload and second headers. " + + "Other signatures are not supported at the moment."); + } + + ResolvableType messageType = ResolvableType.forClassWithGenerics(Message.class, ResolvableType.forType(inputType1)); + Type biFunctionWrapperType = ResolvableType.forClassWithGenerics(Function.class, messageType, ResolvableType.forType(inputType2)).getType(); + + Function wrapperFunction = message -> { + Object payload = ((Message) message).getPayload(); + if (payload.getClass().getName().equals("org.springframework.kafka.support.KafkaNull")) { + payload = null; + } + return userFunction.apply(payload, ((Message) message).getHeaders()); + }; + + return new FunctionRegistration<>(wrapperFunction, functionName).type(biFunctionWrapperType); + } + private Object discoverFunctionInBeanFactory(String functionName) { Object functionCandidate = null; if (this.applicationContext.containsBean(functionName)) { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java index 90176e2c6..f2d6a5595 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java @@ -102,6 +102,15 @@ public final class FunctionTypeUtils { return Collection.class.isAssignableFrom(rawType) || JsonNode.class.isAssignableFrom(rawType); } + public static boolean isTypeMap(Type type) { + if (Map.class.isAssignableFrom(getRawType(type))) { + return true; + } + type = getGenericType(type); + Class rawType = type instanceof ParameterizedType ? getRawType(type) : (Class) type; + return Map.class.isAssignableFrom(rawType); + } + public static boolean isTypeArray(Type type) { return getRawType(type).isArray(); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java index 50ea0c590..da17b2641 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java @@ -89,7 +89,7 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter { for (Object item : iterablePayload) { boolean isConverted = false; if (item.getClass().getName().startsWith("org.springframework.kafka.support.KafkaNull")) { - resultList.add(item); + resultList.add(null); isConverted = true; } for (Iterator iterator = getConverters().iterator(); iterator.hasNext() && !isConverted;) { diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java index 10dac65f5..b7d3dbcd5 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java @@ -34,6 +34,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -184,18 +185,19 @@ public class BeanFactoryAwareFunctionRegistryTests { assertThat(((FunctionInvocationWrapper) function).isComposed()).isTrue(); } + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testBiFunction() { + FunctionCatalog catalog = this.configureCatalog(); + + Function biFunction = catalog.lookup("biFuncUpperCase"); + assertThat(biFunction.apply("hello")).isEqualTo("HELLO"); + } + @Test public void testImperativeFunction() { FunctionCatalog catalog = this.configureCatalog(); -// Function asIs = catalog.lookup("uppercase"); -// assertThat(asIs.apply("uppercase")).isEqualTo("UPPERCASE"); -// -// Function, Flux> asFlux = catalog.lookup("uppercase"); -// List result = asFlux.apply(Flux.just("uppercaseFlux", "uppercaseFlux2")).collectList().block(); -// assertThat(result.get(0)).isEqualTo("UPPERCASEFLUX"); -// assertThat(result.get(1)).isEqualTo("UPPERCASEFLUX2"); - Function>, Flux>> messageFlux = catalog.lookup("uppercase", "application/json"); Message message1 = MessageBuilder.withPayload("\"uppercaseFlux\"".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build(); Message message2 = MessageBuilder.withPayload("\"uppercaseFlux2\"".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build(); @@ -1056,6 +1058,13 @@ public class BeanFactoryAwareFunctionRegistryTests { return () -> "one"; } + @Bean + public BiFunction biFuncUpperCase() { + return (p, h) -> { + return p.toUpperCase(); + }; + } + @Bean public Function, Person> maptopojo() { return map -> {