From a97bdcafd439ca879726dcc5d34d4cbcd1e87838 Mon Sep 17 00:00:00 2001 From: anshlykov Date: Sat, 13 Jun 2020 03:02:52 +0300 Subject: [PATCH] SimpleFunctionRegistryTests: reactive function test case FunctionTypeUtils#isTypeCollection: unwrap publisher JsonMessageConverter: handler for a ParameterizedType conversionHint refactor --- .../context/catalog/FunctionTypeUtils.java | 14 ++++++-- .../catalog/SimpleFunctionRegistry.java | 2 +- .../context/config/JsonMessageConverter.java | 11 +++++- .../catalog/FunctionTypeUtilsTests.java | 9 +++++ .../catalog/SimpleFunctionRegistryTests.java | 34 +++++++++++++++++++ 5 files changed, 65 insertions(+), 5 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java index e3f241eaa..5ec671edb 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java @@ -63,14 +63,22 @@ public final class FunctionTypeUtils { * @return 'true' if this type represents a {@link Collection}. Otherwise 'false'. */ public static boolean isTypeCollection(Type type) { - if (isMessage(type)) { - type = getImmediateGenericType(type, 0); - } + type = getPayloadType(type); Type rawType = type instanceof ParameterizedType ? ((ParameterizedType) type).getRawType() : type; return rawType instanceof Class && Collection.class.isAssignableFrom((Class) rawType); } + public static Type getPayloadType(Type type) { + if (isPublisher(type)) { + type = getImmediateGenericType(type, 0); + } + if (isMessage(type)) { + type = getImmediateGenericType(type, 0); + } + return type; + } + /** * Will attempt to discover functional methods on the class. It's applicable for POJOs as well as * functional classes in `java.util.function` package. For the later the names of the methods are diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index f8e6f83ee..5316c5498 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -763,7 +763,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect if (value instanceof Message) { // see AWS adapter with Optional payload if (messageNeedsConversion(rawType, (Message) value)) { convertedValue = FunctionTypeUtils.isTypeCollection(type) - ? messageConverter.fromMessage((Message) value, (Class) rawType, type) + ? messageConverter.fromMessage((Message) value, (Class) rawType, FunctionTypeUtils.getPayloadType(type)) : messageConverter.fromMessage((Message) value, (Class) rawType); if (logger.isDebugEnabled()) { logger.debug("Converted from Message: " + convertedValue); diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java index 81ad30154..c46953cbf 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java @@ -16,6 +16,9 @@ package org.springframework.cloud.function.context.config; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; + import org.springframework.cloud.function.json.JsonMapper; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; @@ -71,7 +74,13 @@ public class JsonMessageConverter extends AbstractMessageConverter { if (targetClass.isInstance(message.getPayload())) { return message.getPayload(); } - Object result = jsonMapper.fromJson(message.getPayload(), targetClass); + Object result = null; + if (conversionHint == null) { + result = jsonMapper.fromJson(message.getPayload(), targetClass); + } + else if (conversionHint instanceof ParameterizedType) { + result = jsonMapper.fromJson(message.getPayload(), (Type) conversionHint); + } return result; } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtilsTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtilsTests.java index beab4b7d2..46a073423 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtilsTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtilsTests.java @@ -32,6 +32,7 @@ import reactor.util.function.Tuple2; import reactor.util.function.Tuple3; import org.springframework.cloud.function.context.FunctionType; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.messaging.Message; import static org.assertj.core.api.Assertions.assertThat; @@ -138,6 +139,14 @@ public class FunctionTypeUtilsTests { assertThat(Integer.class).isAssignableFrom(type.getOutputType()); } + @Test + public void testIsTypeCollection() { + assertThat(FunctionTypeUtils.isTypeCollection(new ParameterizedTypeReference() { }.getType())).isFalse(); + assertThat(FunctionTypeUtils.isTypeCollection(new ParameterizedTypeReference>() { }.getType())).isTrue(); + assertThat(FunctionTypeUtils.isTypeCollection(new ParameterizedTypeReference>>() { }.getType())).isTrue(); + assertThat(FunctionTypeUtils.isTypeCollection(new ParameterizedTypeReference>>>() { }.getType())).isTrue(); + } + private static Function function() { return null; } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java index 483eba4d9..ac15afbda 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java @@ -17,11 +17,14 @@ package org.springframework.cloud.function.context.catalog; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import com.google.gson.Gson; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -204,6 +207,27 @@ public class SimpleFunctionRegistryTests { assertThat(result).isEqualTo("RATS"); } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testReactiveFunctionMessages() { + FunctionRegistration registration = new FunctionRegistration<>(new ReactiveFunction(), "reactive") + .type(FunctionType.of(ReactiveFunction.class)); + + SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter); + catalog.register(registration); + + Function lookedUpFunction = catalog.lookup("reactive"); + + assertThat(lookedUpFunction).isNotNull(); + Flux> result = (Flux>) lookedUpFunction + .apply(Flux.just(MessageBuilder + .withPayload("[{\"name\":\"item1\"},{\"name\":\"item2\"}]") + .setHeader(MessageHeaders.CONTENT_TYPE, "application/json") + .build() + )); + Assertions.assertIterableEquals(result.blockFirst(), Arrays.asList("item1", "item2")); + } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testWithCustomMessageConverter() { @@ -334,4 +358,14 @@ public class SimpleFunctionRegistryTests { } + private static class ReactiveFunction implements Function>>, Flux>> { + + @Override + public Flux> apply(Flux>> listFlux) { + return listFlux + .map(Message::getPayload) + .map(lst -> lst.stream().map(Person::getName).collect(Collectors.toList())); + } + } + }