GH-612: Fix conversion from JSON for Flux<Message<Person>>

* When using MappingJackson2MessageConverter
This commit is contained in:
Greg Eales
2020-11-22 14:47:48 +00:00
committed by Oleg Zhurakousky
parent 20746d1189
commit 5e5601e48e
2 changed files with 152 additions and 14 deletions

View File

@@ -78,8 +78,6 @@ import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
/**
*
* Basic implementation of FunctionRegistry which maintains the cache of registered functions while
@@ -822,17 +820,13 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
if (messageNeedsConversion(rawType, type, (Message<?>) value)) {
boolean convertWithHint = false;
Type hint = type;
if (FunctionTypeUtils.isTypeCollection(type)) {
hint = FunctionTypeUtils.getGenericType(type);
convertWithHint = true;
}
else if (!rawType.equals(type)) {
Type hint = FunctionTypeUtils.getGenericType(type);;
if (FunctionTypeUtils.isTypeCollection(type) || !rawType.equals(hint)) {
convertWithHint = true;
}
convertedValue = convertWithHint
? this.fromMessage((Message<?>) value, (Class<?>) rawType, FunctionTypeUtils.getGenericType(type))
? this.fromMessage((Message<?>) value, (Class<?>) rawType, hint)
: this.fromMessage((Message<?>) value, (Class<?>) rawType, null);
if (logger.isDebugEnabled()) {
logger.debug("Converted from Message: " + convertedValue);

View File

@@ -18,6 +18,7 @@ package org.springframework.cloud.function.context.catalog;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -53,6 +54,7 @@ import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.AbstractMessageConverter;
import org.springframework.messaging.converter.ByteArrayMessageConverter;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
@@ -231,14 +233,14 @@ public class SimpleFunctionRegistryTests {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testReactiveFunctionMessages() {
FunctionRegistration<ReactiveFunction> registration = new FunctionRegistration<>(new ReactiveFunction(), "reactive")
.type(FunctionType.of(ReactiveFunction.class));
public void testReactiveFunctionWithListMessages() {
FunctionRegistration<ReactiveFunctionWithList> registration = new FunctionRegistration<>(new ReactiveFunctionWithList(), "reactiveWithList")
.type(FunctionType.of(ReactiveFunctionWithList.class));
SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter);
catalog.register(registration);
Function lookedUpFunction = catalog.lookup("reactive");
Function lookedUpFunction = catalog.lookup("reactiveWithList");
assertThat(lookedUpFunction).isNotNull();
Flux<List<String>> result = (Flux<List<String>>) lookedUpFunction
@@ -250,6 +252,116 @@ public class SimpleFunctionRegistryTests {
Assertions.assertIterableEquals(result.blockFirst(), Arrays.asList("item1", "item2"));
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
@Ignore
public void testReactiveFunctionWithListMessagesJackson() {
FunctionRegistration<ReactiveFunctionWithList> registration = new FunctionRegistration<>(new ReactiveFunctionWithList(), "reactiveWithList")
.type(FunctionType.of(ReactiveFunctionWithList.class));
SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService,
new CompositeMessageConverter(Collections.singletonList(new MappingJackson2MessageConverter())));
catalog.register(registration);
Function lookedUpFunction = catalog.lookup("reactiveWithList");
assertThat(lookedUpFunction).isNotNull();
Flux<List<String>> result = (Flux<List<String>>) lookedUpFunction
.apply(Flux.just(MessageBuilder
.withPayload("[{\"name\":\"item1\"},{\"name\":\"item2\"}]")
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
.build()
));
Assertions.assertIterableEquals(result.blockFirst(), Arrays.asList("item1", "item2"));
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testReactiveFunctionMessage() {
FunctionRegistration<ReactiveFunction> registration = new FunctionRegistration<>(new ReactiveFunction(), "reactive")
.type(FunctionType.of(ReactiveFunction.class));
SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter);
catalog.register(registration);
Function lookedUpFunction = catalog.lookup("reactive");
assertThat(lookedUpFunction).isNotNull();
Flux<String> result = (Flux<String>) lookedUpFunction
.apply(Flux.just(MessageBuilder
.withPayload("{\"name\":\"item\"}")
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
.build()
));
Assertions.assertEquals(result.blockFirst(), "item");
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testReactiveFunctionMessageJackson() {
FunctionRegistration<ReactiveFunction> registration = new FunctionRegistration<>(new ReactiveFunction(), "reactive")
.type(FunctionType.of(ReactiveFunction.class));
SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService,
new CompositeMessageConverter(Collections.singletonList(new MappingJackson2MessageConverter())));
catalog.register(registration);
Function lookedUpFunction = catalog.lookup("reactive");
assertThat(lookedUpFunction).isNotNull();
Flux<String> result = (Flux<String>) lookedUpFunction
.apply(Flux.just(MessageBuilder
.withPayload("{\"name\":\"item\"}")
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
.build()
));
Assertions.assertEquals(result.blockFirst(), "item");
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testReactiveFunctionWithHolderMessage() {
FunctionRegistration<ReactiveFunctionWithHolder> registration = new FunctionRegistration<>(new ReactiveFunctionWithHolder(), "reactive")
.type(FunctionType.of(ReactiveFunctionWithHolder.class));
SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter);
catalog.register(registration);
Function lookedUpFunction = catalog.lookup("reactive");
assertThat(lookedUpFunction).isNotNull();
Flux<String> result = (Flux<String>) lookedUpFunction
.apply(Flux.just(MessageBuilder
.withPayload("{\"data\":{\"name\":\"item\"}}")
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
.build()
));
Assertions.assertEquals(result.blockFirst(), "item");
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
@Ignore
public void testReactiveFunctionWithHolderMessageJackson() {
FunctionRegistration<ReactiveFunctionWithHolder> registration = new FunctionRegistration<>(new ReactiveFunctionWithHolder(), "reactive")
.type(FunctionType.of(ReactiveFunctionWithHolder.class));
SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService,
new CompositeMessageConverter(Collections.singletonList(new MappingJackson2MessageConverter())));
catalog.register(registration);
Function lookedUpFunction = catalog.lookup("reactive");
assertThat(lookedUpFunction).isNotNull();
Flux<String> result = (Flux<String>) lookedUpFunction
.apply(Flux.just(MessageBuilder
.withPayload("{\"data\":{\"name\":\"item\"}}")
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
.build()
));
Assertions.assertEquals(result.blockFirst(), "item");
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testWithCustomMessageConverter() {
@@ -381,7 +493,7 @@ public class SimpleFunctionRegistryTests {
}
private static class ReactiveFunction implements Function<Flux<Message<List<Person>>>, Flux<List<String>>> {
private static class ReactiveFunctionWithList implements Function<Flux<Message<List<Person>>>, Flux<List<String>>> {
@Override
public Flux<List<String>> apply(Flux<Message<List<Person>>> listFlux) {
@@ -391,4 +503,36 @@ public class SimpleFunctionRegistryTests {
}
}
private static class ReactiveFunction implements Function<Flux<Message<Person>>, Flux<String>> {
@Override
public Flux<String> apply(Flux<Message<Person>> flux) {
return flux
.map(Message::getPayload)
.map(Person::getName);
}
}
private static class ReactiveFunctionWithHolder implements Function<Flux<Message<GenericDataHolder<Person>>>, Flux<String>> {
@Override
public Flux<String> apply(Flux<Message<GenericDataHolder<Person>>> flux) {
return flux
.map(Message::getPayload)
.map(GenericDataHolder::getData)
.map(Person::getName);
}
}
public static class GenericDataHolder<T> {
private T data;
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
}
}