From 5447465cc8c8f9e8513a7441ac85a3eeb9fe25af Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 8 Nov 2021 11:09:51 +0100 Subject: [PATCH] GH-762 Fix condition for parsing JSON collection strings into individual messages Resolves #762 --- .../context/catalog/FunctionTypeUtils.java | 23 ++++ .../catalog/SimpleFunctionRegistry.java | 20 +++- .../catalog/SimpleFunctionRegistryTests.java | 107 ++++++++++++++++++ 3 files changed, 145 insertions(+), 5 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java index c25cfe219..b8aa1801c 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java @@ -23,6 +23,7 @@ import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -37,6 +38,8 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import org.springframework.beans.factory.FactoryBean; +import org.springframework.beans.factory.ListableBeanFactory; +import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionType; import org.springframework.cloud.function.context.config.FunctionContextUtils; @@ -45,7 +48,9 @@ import org.springframework.context.support.GenericApplicationContext; import org.springframework.core.ResolvableType; import org.springframework.messaging.Message; import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; import org.springframework.util.ReflectionUtils; +import org.springframework.util.StringUtils; /** * Set of utility operations to interrogate function definitions. @@ -80,6 +85,10 @@ public final class FunctionTypeUtils { return Collection.class.isAssignableFrom(rawType); } + public static boolean isTypeArray(Type type) { + return getRawType(type).isArray(); + } + /** * A convenience method identical to {@link #getImmediateGenericType(Type, int)} * for cases when provided 'type' is {@link Publisher} or {@link Message}. @@ -271,9 +280,23 @@ public final class FunctionTypeUtils { type = FunctionType.of(FunctionContextUtils.findType(applicationContext.getBeanFactory(), functionBeanDefinitionName)).getType(); } } + else if (!(type instanceof ParameterizedType)) { + String beanDefinitionName = discoverBeanDefinitionNameByQualifier(applicationContext.getBeanFactory(), functionName); + if (StringUtils.hasText(beanDefinitionName)) { + type = FunctionType.of(FunctionContextUtils.findType(applicationContext.getBeanFactory(), beanDefinitionName)).getType(); + } + } return type; } + public static String discoverBeanDefinitionNameByQualifier(ListableBeanFactory beanFactory, String qualifier) { + Map beanMap = BeanFactoryAnnotationUtils.qualifiedBeansOfType(beanFactory, Object.class, qualifier); + if (!CollectionUtils.isEmpty(beanMap) && beanMap.size() == 1) { + return beanMap.keySet().iterator().next(); + } + return null; + } + @SuppressWarnings("unchecked") public static Type getOutputType(Type functionType) { assertSupportedTypes(functionType); diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index 101249b58..b3285b886 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -764,12 +764,22 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect if (input instanceof Message) { payload = ((Message) input).getPayload(); } - if (JsonMapper.isJsonStringRepresentsCollection(payload) && !FunctionTypeUtils.isTypeCollection(this.inputType)) { - payload = jsonMapper.fromJson(payload, List.class); + if (JsonMapper.isJsonStringRepresentsCollection(payload) + && !FunctionTypeUtils.isTypeCollection(this.inputType) && !FunctionTypeUtils.isTypeArray(this.inputType)) { MessageHeaders headers = ((Message) input).getHeaders(); - input = ((List) payload).stream() - .map(p -> MessageBuilder.withPayload(p).copyHeaders(headers).build()) - .collect(Collectors.toList()); + Collection collectionPayload = jsonMapper.fromJson(payload, Collection.class); + Class inputClass = FunctionTypeUtils.getRawType(this.inputType); + if (this.isInputTypeMessage()) { + inputClass = FunctionTypeUtils.getRawType(FunctionTypeUtils.getImmediateGenericType(this.inputType, 0)); + } + + if (!inputClass.isAssignableFrom(Object.class) && !inputClass.isAssignableFrom(byte[].class)) { + logger.debug("Converting JSON string representing collection to a list of Messages. Function '" + + this + "' will be invoked iteratively"); + input = collectionPayload.stream() + .map(p -> MessageBuilder.withPayload(p).copyHeaders(headers).build()) + .collect(Collectors.toList()); + } } } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java index 0bcf484ab..bde1afa5e 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -134,6 +135,83 @@ public class SimpleFunctionRegistryTests { assertThat(result).isEqualTo("{\"HELLO\":\"WORLD\"}"); } + @SuppressWarnings("unchecked") + @Test + public void testSCF762() { + SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter, + new JacksonMapper(new ObjectMapper())); + + FunctionRegistration reg1 = new FunctionRegistration<>( + new UpperCase(), "uppercase").type(FunctionType.of(UpperCase.class)); + catalog.register(reg1); + // + FunctionRegistration reg2 = new FunctionRegistration<>( + new UpperCaseMessage(), "uppercaseMessage").type(FunctionType.of(UpperCaseMessage.class)); + catalog.register(reg2); + // + FunctionRegistration reg3 = new FunctionRegistration<>( + new StringArrayFunction(), "stringArray").type(FunctionType.of(StringArrayFunction.class)); + catalog.register(reg3); + // + FunctionRegistration reg4 = new FunctionRegistration<>( + new TypelessFunction(), "typeless").type(FunctionType.of(TypelessFunction.class)); + catalog.register(reg4); + // + FunctionRegistration reg5 = new FunctionRegistration<>( + new ByteArrayFunction(), "typeless").type(FunctionType.of(ByteArrayFunction.class)); + catalog.register(reg5); + // + FunctionRegistration reg6 = new FunctionRegistration<>( + new StringListFunction(), "stringList").type(FunctionType.of(StringListFunction.class)); + catalog.register(reg6); + + Message collectionMessage = MessageBuilder.withPayload("[\"ricky\", \"julien\", \"bubbles\"]").build(); + Message singleValueMessage = MessageBuilder.withPayload("\"ricky\"").build(); + + FunctionInvocationWrapper lookedUpFunction = catalog.lookup("uppercase", "application/json"); + Object result = lookedUpFunction.apply(singleValueMessage); + assertThat(result).isInstanceOf(Message.class); + assertThat(((Message) result).getPayload()).isEqualTo("\"RICKY\"".getBytes()); + + result = lookedUpFunction.apply(collectionMessage); + assertThat(result).isInstanceOf(Flux.class); + List> collectionIfResults = Flux.from((Publisher>) result).collectList().block(); + assertThat(collectionIfResults.size()).isEqualTo(3); + assertThat(collectionIfResults.get(0).getPayload()).isEqualTo("\"RICKY\"".getBytes()); + assertThat(collectionIfResults.get(1).getPayload()).isEqualTo("\"JULIEN\"".getBytes()); + + lookedUpFunction = catalog.lookup("typeless", "application/json"); + result = lookedUpFunction.apply(singleValueMessage); + assertThat(result).isInstanceOf(Message.class); + assertThat(((Message) result).getPayload()).isEqualTo("\"ricky\"".getBytes()); + + result = lookedUpFunction.apply(collectionMessage); + assertThat(result).isInstanceOf(Message.class); + assertThat(((Message) result).getPayload()).isEqualTo("[\"ricky\", \"julien\", \"bubbles\"]".getBytes()); + + + lookedUpFunction = catalog.lookup("stringArray", "application/json"); + result = lookedUpFunction.apply(singleValueMessage); + assertThat(result).isInstanceOf(Message.class); + assertThat(((Message) result).getPayload()).isEqualTo("[\"ricky\"]".getBytes()); + + result = lookedUpFunction.apply(collectionMessage); + assertThat(result).isInstanceOf(Message.class); + assertThat(((Message) result).getPayload()).isEqualTo("[ricky, julien, bubbles]".getBytes()); + + + lookedUpFunction = catalog.lookup("stringList", "application/json"); + result = lookedUpFunction.apply(singleValueMessage); + assertThat(result).isInstanceOf(Message.class); + assertThat(((Message) result).getPayload()).isEqualTo("[\"ricky\"]".getBytes()); + + result = lookedUpFunction.apply(collectionMessage); + assertThat(result).isInstanceOf(Message.class); + System.out.println(new String(((Message) result).getPayload())); + assertThat(((Message) result).getPayload()).isEqualTo("[ricky, julien, bubbles]".getBytes()); + + } + @SuppressWarnings("unchecked") @Test public void testSCF588() { @@ -584,4 +662,33 @@ public class SimpleFunctionRegistryTests { .build(); } } + + private static class StringArrayFunction implements Function { + @Override + public String apply(String[] t) { + return Arrays.asList(t).toString(); + } + } + + private static class StringListFunction implements Function, String> { + @Override + public String apply(List t) { + return t.toString(); + } + } + + private static class TypelessFunction implements Function { + @Override + public String apply(Object t) { + return t.toString(); + } + } + + private static class ByteArrayFunction implements Function { + @Override + public String apply(byte[] t) { + return new String(t); + } + } + }