GH-762 Fix condition for parsing JSON collection strings into individual messages
Resolves #762
This commit is contained in:
@@ -23,6 +23,7 @@ import java.lang.reflect.Type;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Consumer;
|
||||
@@ -37,6 +38,8 @@ import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import org.springframework.beans.factory.FactoryBean;
|
||||
import org.springframework.beans.factory.ListableBeanFactory;
|
||||
import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
|
||||
import org.springframework.cloud.function.context.FunctionRegistration;
|
||||
import org.springframework.cloud.function.context.FunctionType;
|
||||
import org.springframework.cloud.function.context.config.FunctionContextUtils;
|
||||
@@ -45,7 +48,9 @@ import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Set of utility operations to interrogate function definitions.
|
||||
@@ -80,6 +85,10 @@ public final class FunctionTypeUtils {
|
||||
return Collection.class.isAssignableFrom(rawType);
|
||||
}
|
||||
|
||||
public static boolean isTypeArray(Type type) {
|
||||
return getRawType(type).isArray();
|
||||
}
|
||||
|
||||
/**
|
||||
* A convenience method identical to {@link #getImmediateGenericType(Type, int)}
|
||||
* for cases when provided 'type' is {@link Publisher} or {@link Message}.
|
||||
@@ -271,9 +280,23 @@ public final class FunctionTypeUtils {
|
||||
type = FunctionType.of(FunctionContextUtils.findType(applicationContext.getBeanFactory(), functionBeanDefinitionName)).getType();
|
||||
}
|
||||
}
|
||||
else if (!(type instanceof ParameterizedType)) {
|
||||
String beanDefinitionName = discoverBeanDefinitionNameByQualifier(applicationContext.getBeanFactory(), functionName);
|
||||
if (StringUtils.hasText(beanDefinitionName)) {
|
||||
type = FunctionType.of(FunctionContextUtils.findType(applicationContext.getBeanFactory(), beanDefinitionName)).getType();
|
||||
}
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
public static String discoverBeanDefinitionNameByQualifier(ListableBeanFactory beanFactory, String qualifier) {
|
||||
Map<String, Object> beanMap = BeanFactoryAnnotationUtils.qualifiedBeansOfType(beanFactory, Object.class, qualifier);
|
||||
if (!CollectionUtils.isEmpty(beanMap) && beanMap.size() == 1) {
|
||||
return beanMap.keySet().iterator().next();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static Type getOutputType(Type functionType) {
|
||||
assertSupportedTypes(functionType);
|
||||
|
||||
@@ -764,12 +764,22 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
if (input instanceof Message) {
|
||||
payload = ((Message) input).getPayload();
|
||||
}
|
||||
if (JsonMapper.isJsonStringRepresentsCollection(payload) && !FunctionTypeUtils.isTypeCollection(this.inputType)) {
|
||||
payload = jsonMapper.fromJson(payload, List.class);
|
||||
if (JsonMapper.isJsonStringRepresentsCollection(payload)
|
||||
&& !FunctionTypeUtils.isTypeCollection(this.inputType) && !FunctionTypeUtils.isTypeArray(this.inputType)) {
|
||||
MessageHeaders headers = ((Message) input).getHeaders();
|
||||
input = ((List) payload).stream()
|
||||
.map(p -> MessageBuilder.withPayload(p).copyHeaders(headers).build())
|
||||
.collect(Collectors.toList());
|
||||
Collection collectionPayload = jsonMapper.fromJson(payload, Collection.class);
|
||||
Class inputClass = FunctionTypeUtils.getRawType(this.inputType);
|
||||
if (this.isInputTypeMessage()) {
|
||||
inputClass = FunctionTypeUtils.getRawType(FunctionTypeUtils.getImmediateGenericType(this.inputType, 0));
|
||||
}
|
||||
|
||||
if (!inputClass.isAssignableFrom(Object.class) && !inputClass.isAssignableFrom(byte[].class)) {
|
||||
logger.debug("Converting JSON string representing collection to a list of Messages. Function '"
|
||||
+ this + "' will be invoked iteratively");
|
||||
input = collectionPayload.stream()
|
||||
.map(p -> MessageBuilder.withPayload(p).copyHeaders(headers).build())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
|
||||
@@ -134,6 +135,83 @@ public class SimpleFunctionRegistryTests {
|
||||
assertThat(result).isEqualTo("{\"HELLO\":\"WORLD\"}");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testSCF762() {
|
||||
SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter,
|
||||
new JacksonMapper(new ObjectMapper()));
|
||||
|
||||
FunctionRegistration<UpperCase> reg1 = new FunctionRegistration<>(
|
||||
new UpperCase(), "uppercase").type(FunctionType.of(UpperCase.class));
|
||||
catalog.register(reg1);
|
||||
//
|
||||
FunctionRegistration<UpperCaseMessage> reg2 = new FunctionRegistration<>(
|
||||
new UpperCaseMessage(), "uppercaseMessage").type(FunctionType.of(UpperCaseMessage.class));
|
||||
catalog.register(reg2);
|
||||
//
|
||||
FunctionRegistration<StringArrayFunction> reg3 = new FunctionRegistration<>(
|
||||
new StringArrayFunction(), "stringArray").type(FunctionType.of(StringArrayFunction.class));
|
||||
catalog.register(reg3);
|
||||
//
|
||||
FunctionRegistration<TypelessFunction> reg4 = new FunctionRegistration<>(
|
||||
new TypelessFunction(), "typeless").type(FunctionType.of(TypelessFunction.class));
|
||||
catalog.register(reg4);
|
||||
//
|
||||
FunctionRegistration<ByteArrayFunction> reg5 = new FunctionRegistration<>(
|
||||
new ByteArrayFunction(), "typeless").type(FunctionType.of(ByteArrayFunction.class));
|
||||
catalog.register(reg5);
|
||||
//
|
||||
FunctionRegistration<StringListFunction> reg6 = new FunctionRegistration<>(
|
||||
new StringListFunction(), "stringList").type(FunctionType.of(StringListFunction.class));
|
||||
catalog.register(reg6);
|
||||
|
||||
Message<String> collectionMessage = MessageBuilder.withPayload("[\"ricky\", \"julien\", \"bubbles\"]").build();
|
||||
Message<String> singleValueMessage = MessageBuilder.withPayload("\"ricky\"").build();
|
||||
|
||||
FunctionInvocationWrapper lookedUpFunction = catalog.lookup("uppercase", "application/json");
|
||||
Object result = lookedUpFunction.apply(singleValueMessage);
|
||||
assertThat(result).isInstanceOf(Message.class);
|
||||
assertThat(((Message<byte[]>) result).getPayload()).isEqualTo("\"RICKY\"".getBytes());
|
||||
|
||||
result = lookedUpFunction.apply(collectionMessage);
|
||||
assertThat(result).isInstanceOf(Flux.class);
|
||||
List<Message<byte[]>> collectionIfResults = Flux.from((Publisher<Message<byte[]>>) result).collectList().block();
|
||||
assertThat(collectionIfResults.size()).isEqualTo(3);
|
||||
assertThat(collectionIfResults.get(0).getPayload()).isEqualTo("\"RICKY\"".getBytes());
|
||||
assertThat(collectionIfResults.get(1).getPayload()).isEqualTo("\"JULIEN\"".getBytes());
|
||||
|
||||
lookedUpFunction = catalog.lookup("typeless", "application/json");
|
||||
result = lookedUpFunction.apply(singleValueMessage);
|
||||
assertThat(result).isInstanceOf(Message.class);
|
||||
assertThat(((Message<byte[]>) result).getPayload()).isEqualTo("\"ricky\"".getBytes());
|
||||
|
||||
result = lookedUpFunction.apply(collectionMessage);
|
||||
assertThat(result).isInstanceOf(Message.class);
|
||||
assertThat(((Message<byte[]>) result).getPayload()).isEqualTo("[\"ricky\", \"julien\", \"bubbles\"]".getBytes());
|
||||
|
||||
|
||||
lookedUpFunction = catalog.lookup("stringArray", "application/json");
|
||||
result = lookedUpFunction.apply(singleValueMessage);
|
||||
assertThat(result).isInstanceOf(Message.class);
|
||||
assertThat(((Message<byte[]>) result).getPayload()).isEqualTo("[\"ricky\"]".getBytes());
|
||||
|
||||
result = lookedUpFunction.apply(collectionMessage);
|
||||
assertThat(result).isInstanceOf(Message.class);
|
||||
assertThat(((Message<byte[]>) result).getPayload()).isEqualTo("[ricky, julien, bubbles]".getBytes());
|
||||
|
||||
|
||||
lookedUpFunction = catalog.lookup("stringList", "application/json");
|
||||
result = lookedUpFunction.apply(singleValueMessage);
|
||||
assertThat(result).isInstanceOf(Message.class);
|
||||
assertThat(((Message<byte[]>) result).getPayload()).isEqualTo("[\"ricky\"]".getBytes());
|
||||
|
||||
result = lookedUpFunction.apply(collectionMessage);
|
||||
assertThat(result).isInstanceOf(Message.class);
|
||||
System.out.println(new String(((Message<byte[]>) result).getPayload()));
|
||||
assertThat(((Message<byte[]>) result).getPayload()).isEqualTo("[ricky, julien, bubbles]".getBytes());
|
||||
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testSCF588() {
|
||||
@@ -584,4 +662,33 @@ public class SimpleFunctionRegistryTests {
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
private static class StringArrayFunction implements Function<String[], String> {
|
||||
@Override
|
||||
public String apply(String[] t) {
|
||||
return Arrays.asList(t).toString();
|
||||
}
|
||||
}
|
||||
|
||||
private static class StringListFunction implements Function<List<String>, String> {
|
||||
@Override
|
||||
public String apply(List<String> t) {
|
||||
return t.toString();
|
||||
}
|
||||
}
|
||||
|
||||
private static class TypelessFunction implements Function<Object, String> {
|
||||
@Override
|
||||
public String apply(Object t) {
|
||||
return t.toString();
|
||||
}
|
||||
}
|
||||
|
||||
private static class ByteArrayFunction implements Function<byte[], String> {
|
||||
@Override
|
||||
public String apply(byte[] t) {
|
||||
return new String(t);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user