From 5e5601e48e120007217859c40cdb9d4feffd00d0 Mon Sep 17 00:00:00 2001 From: Greg Eales Date: Sun, 22 Nov 2020 14:47:48 +0000 Subject: [PATCH] GH-612: Fix conversion from JSON for Flux> * When using MappingJackson2MessageConverter --- .../catalog/SimpleFunctionRegistry.java | 12 +- .../catalog/SimpleFunctionRegistryTests.java | 154 +++++++++++++++++- 2 files changed, 152 insertions(+), 14 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index f7245777b..881634afd 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -78,8 +78,6 @@ import org.springframework.util.ObjectUtils; import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; - - /** * * Basic implementation of FunctionRegistry which maintains the cache of registered functions while @@ -822,17 +820,13 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect if (messageNeedsConversion(rawType, type, (Message) value)) { boolean convertWithHint = false; - Type hint = type; - if (FunctionTypeUtils.isTypeCollection(type)) { - hint = FunctionTypeUtils.getGenericType(type); - convertWithHint = true; - } - else if (!rawType.equals(type)) { + Type hint = FunctionTypeUtils.getGenericType(type);; + if (FunctionTypeUtils.isTypeCollection(type) || !rawType.equals(hint)) { convertWithHint = true; } convertedValue = convertWithHint - ? this.fromMessage((Message) value, (Class) rawType, FunctionTypeUtils.getGenericType(type)) + ? this.fromMessage((Message) value, (Class) rawType, hint) : this.fromMessage((Message) value, (Class) rawType, null); if (logger.isDebugEnabled()) { logger.debug("Converted from Message: " + convertedValue); diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java index 3efc9a3f4..bedce401a 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java @@ -18,6 +18,7 @@ package org.springframework.cloud.function.context.catalog; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.function.Function; import java.util.function.Supplier; @@ -53,6 +54,7 @@ import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.converter.AbstractMessageConverter; import org.springframework.messaging.converter.ByteArrayMessageConverter; import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.converter.StringMessageConverter; import org.springframework.messaging.support.MessageBuilder; @@ -231,14 +233,14 @@ public class SimpleFunctionRegistryTests { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test - public void testReactiveFunctionMessages() { - FunctionRegistration registration = new FunctionRegistration<>(new ReactiveFunction(), "reactive") - .type(FunctionType.of(ReactiveFunction.class)); + public void testReactiveFunctionWithListMessages() { + FunctionRegistration registration = new FunctionRegistration<>(new ReactiveFunctionWithList(), "reactiveWithList") + .type(FunctionType.of(ReactiveFunctionWithList.class)); SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter); catalog.register(registration); - Function lookedUpFunction = catalog.lookup("reactive"); + Function lookedUpFunction = catalog.lookup("reactiveWithList"); assertThat(lookedUpFunction).isNotNull(); Flux> result = (Flux>) lookedUpFunction @@ -250,6 +252,116 @@ public class SimpleFunctionRegistryTests { Assertions.assertIterableEquals(result.blockFirst(), Arrays.asList("item1", "item2")); } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + @Ignore + public void testReactiveFunctionWithListMessagesJackson() { + FunctionRegistration registration = new FunctionRegistration<>(new ReactiveFunctionWithList(), "reactiveWithList") + .type(FunctionType.of(ReactiveFunctionWithList.class)); + + SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, + new CompositeMessageConverter(Collections.singletonList(new MappingJackson2MessageConverter()))); + catalog.register(registration); + + Function lookedUpFunction = catalog.lookup("reactiveWithList"); + + assertThat(lookedUpFunction).isNotNull(); + Flux> result = (Flux>) lookedUpFunction + .apply(Flux.just(MessageBuilder + .withPayload("[{\"name\":\"item1\"},{\"name\":\"item2\"}]") + .setHeader(MessageHeaders.CONTENT_TYPE, "application/json") + .build() + )); + Assertions.assertIterableEquals(result.blockFirst(), Arrays.asList("item1", "item2")); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testReactiveFunctionMessage() { + FunctionRegistration registration = new FunctionRegistration<>(new ReactiveFunction(), "reactive") + .type(FunctionType.of(ReactiveFunction.class)); + + SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter); + catalog.register(registration); + + Function lookedUpFunction = catalog.lookup("reactive"); + + assertThat(lookedUpFunction).isNotNull(); + Flux result = (Flux) lookedUpFunction + .apply(Flux.just(MessageBuilder + .withPayload("{\"name\":\"item\"}") + .setHeader(MessageHeaders.CONTENT_TYPE, "application/json") + .build() + )); + Assertions.assertEquals(result.blockFirst(), "item"); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testReactiveFunctionMessageJackson() { + FunctionRegistration registration = new FunctionRegistration<>(new ReactiveFunction(), "reactive") + .type(FunctionType.of(ReactiveFunction.class)); + + SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, + new CompositeMessageConverter(Collections.singletonList(new MappingJackson2MessageConverter()))); + catalog.register(registration); + + Function lookedUpFunction = catalog.lookup("reactive"); + + assertThat(lookedUpFunction).isNotNull(); + Flux result = (Flux) lookedUpFunction + .apply(Flux.just(MessageBuilder + .withPayload("{\"name\":\"item\"}") + .setHeader(MessageHeaders.CONTENT_TYPE, "application/json") + .build() + )); + Assertions.assertEquals(result.blockFirst(), "item"); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testReactiveFunctionWithHolderMessage() { + FunctionRegistration registration = new FunctionRegistration<>(new ReactiveFunctionWithHolder(), "reactive") + .type(FunctionType.of(ReactiveFunctionWithHolder.class)); + + SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter); + catalog.register(registration); + + Function lookedUpFunction = catalog.lookup("reactive"); + + assertThat(lookedUpFunction).isNotNull(); + Flux result = (Flux) lookedUpFunction + .apply(Flux.just(MessageBuilder + .withPayload("{\"data\":{\"name\":\"item\"}}") + .setHeader(MessageHeaders.CONTENT_TYPE, "application/json") + .build() + )); + Assertions.assertEquals(result.blockFirst(), "item"); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + @Ignore + public void testReactiveFunctionWithHolderMessageJackson() { + FunctionRegistration registration = new FunctionRegistration<>(new ReactiveFunctionWithHolder(), "reactive") + .type(FunctionType.of(ReactiveFunctionWithHolder.class)); + + SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, + new CompositeMessageConverter(Collections.singletonList(new MappingJackson2MessageConverter()))); + catalog.register(registration); + + Function lookedUpFunction = catalog.lookup("reactive"); + + assertThat(lookedUpFunction).isNotNull(); + Flux result = (Flux) lookedUpFunction + .apply(Flux.just(MessageBuilder + .withPayload("{\"data\":{\"name\":\"item\"}}") + .setHeader(MessageHeaders.CONTENT_TYPE, "application/json") + .build() + )); + Assertions.assertEquals(result.blockFirst(), "item"); + } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testWithCustomMessageConverter() { @@ -381,7 +493,7 @@ public class SimpleFunctionRegistryTests { } - private static class ReactiveFunction implements Function>>, Flux>> { + private static class ReactiveFunctionWithList implements Function>>, Flux>> { @Override public Flux> apply(Flux>> listFlux) { @@ -391,4 +503,36 @@ public class SimpleFunctionRegistryTests { } } + private static class ReactiveFunction implements Function>, Flux> { + + @Override + public Flux apply(Flux> flux) { + return flux + .map(Message::getPayload) + .map(Person::getName); + } + } + + private static class ReactiveFunctionWithHolder implements Function>>, Flux> { + + @Override + public Flux apply(Flux>> flux) { + return flux + .map(Message::getPayload) + .map(GenericDataHolder::getData) + .map(Person::getName); + } + } + + public static class GenericDataHolder { + private T data; + + public T getData() { + return data; + } + + public void setData(T data) { + this.data = data; + } + } }