SimpleFunctionRegistryTests: reactive function test case
FunctionTypeUtils#isTypeCollection: unwrap publisher JsonMessageConverter: handler for a ParameterizedType conversionHint refactor
This commit is contained in:
committed by
Oleg Zhurakousky
parent
fdbbcc4e45
commit
a97bdcafd4
@@ -63,14 +63,22 @@ public final class FunctionTypeUtils {
|
||||
* @return 'true' if this type represents a {@link Collection}. Otherwise 'false'.
|
||||
*/
|
||||
public static boolean isTypeCollection(Type type) {
|
||||
if (isMessage(type)) {
|
||||
type = getImmediateGenericType(type, 0);
|
||||
}
|
||||
type = getPayloadType(type);
|
||||
Type rawType = type instanceof ParameterizedType ? ((ParameterizedType) type).getRawType() : type;
|
||||
|
||||
return rawType instanceof Class<?> && Collection.class.isAssignableFrom((Class<?>) rawType);
|
||||
}
|
||||
|
||||
public static Type getPayloadType(Type type) {
|
||||
if (isPublisher(type)) {
|
||||
type = getImmediateGenericType(type, 0);
|
||||
}
|
||||
if (isMessage(type)) {
|
||||
type = getImmediateGenericType(type, 0);
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
/**
|
||||
* Will attempt to discover functional methods on the class. It's applicable for POJOs as well as
|
||||
* functional classes in `java.util.function` package. For the later the names of the methods are
|
||||
|
||||
@@ -763,7 +763,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
if (value instanceof Message<?>) { // see AWS adapter with Optional payload
|
||||
if (messageNeedsConversion(rawType, (Message<?>) value)) {
|
||||
convertedValue = FunctionTypeUtils.isTypeCollection(type)
|
||||
? messageConverter.fromMessage((Message<?>) value, (Class<?>) rawType, type)
|
||||
? messageConverter.fromMessage((Message<?>) value, (Class<?>) rawType, FunctionTypeUtils.getPayloadType(type))
|
||||
: messageConverter.fromMessage((Message<?>) value, (Class<?>) rawType);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Converted from Message: " + convertedValue);
|
||||
|
||||
@@ -16,6 +16,9 @@
|
||||
|
||||
package org.springframework.cloud.function.context.config;
|
||||
|
||||
import java.lang.reflect.ParameterizedType;
|
||||
import java.lang.reflect.Type;
|
||||
|
||||
import org.springframework.cloud.function.json.JsonMapper;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.Message;
|
||||
@@ -71,7 +74,13 @@ public class JsonMessageConverter extends AbstractMessageConverter {
|
||||
if (targetClass.isInstance(message.getPayload())) {
|
||||
return message.getPayload();
|
||||
}
|
||||
Object result = jsonMapper.fromJson(message.getPayload(), targetClass);
|
||||
Object result = null;
|
||||
if (conversionHint == null) {
|
||||
result = jsonMapper.fromJson(message.getPayload(), targetClass);
|
||||
}
|
||||
else if (conversionHint instanceof ParameterizedType) {
|
||||
result = jsonMapper.fromJson(message.getPayload(), (Type) conversionHint);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ import reactor.util.function.Tuple2;
|
||||
import reactor.util.function.Tuple3;
|
||||
|
||||
import org.springframework.cloud.function.context.FunctionType;
|
||||
import org.springframework.core.ParameterizedTypeReference;
|
||||
import org.springframework.messaging.Message;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
@@ -138,6 +139,14 @@ public class FunctionTypeUtilsTests {
|
||||
assertThat(Integer.class).isAssignableFrom(type.getOutputType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsTypeCollection() {
|
||||
assertThat(FunctionTypeUtils.isTypeCollection(new ParameterizedTypeReference<String>() { }.getType())).isFalse();
|
||||
assertThat(FunctionTypeUtils.isTypeCollection(new ParameterizedTypeReference<List<String>>() { }.getType())).isTrue();
|
||||
assertThat(FunctionTypeUtils.isTypeCollection(new ParameterizedTypeReference<Flux<List<String>>>() { }.getType())).isTrue();
|
||||
assertThat(FunctionTypeUtils.isTypeCollection(new ParameterizedTypeReference<Flux<Message<List<String>>>>() { }.getType())).isTrue();
|
||||
}
|
||||
|
||||
private static Function<String, Integer> function() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -17,11 +17,14 @@
|
||||
package org.springframework.cloud.function.context.catalog;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -204,6 +207,27 @@ public class SimpleFunctionRegistryTests {
|
||||
assertThat(result).isEqualTo("RATS");
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
@Test
|
||||
public void testReactiveFunctionMessages() {
|
||||
FunctionRegistration<ReactiveFunction> registration = new FunctionRegistration<>(new ReactiveFunction(), "reactive")
|
||||
.type(FunctionType.of(ReactiveFunction.class));
|
||||
|
||||
SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter);
|
||||
catalog.register(registration);
|
||||
|
||||
Function lookedUpFunction = catalog.lookup("reactive");
|
||||
|
||||
assertThat(lookedUpFunction).isNotNull();
|
||||
Flux<List<String>> result = (Flux<List<String>>) lookedUpFunction
|
||||
.apply(Flux.just(MessageBuilder
|
||||
.withPayload("[{\"name\":\"item1\"},{\"name\":\"item2\"}]")
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
|
||||
.build()
|
||||
));
|
||||
Assertions.assertIterableEquals(result.blockFirst(), Arrays.asList("item1", "item2"));
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
@Test
|
||||
public void testWithCustomMessageConverter() {
|
||||
@@ -334,4 +358,14 @@ public class SimpleFunctionRegistryTests {
|
||||
|
||||
}
|
||||
|
||||
private static class ReactiveFunction implements Function<Flux<Message<List<Person>>>, Flux<List<String>>> {
|
||||
|
||||
@Override
|
||||
public Flux<List<String>> apply(Flux<Message<List<Person>>> listFlux) {
|
||||
return listFlux
|
||||
.map(Message::getPayload)
|
||||
.map(lst -> lst.stream().map(Person::getName).collect(Collectors.toList()));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user