diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java index b4effd5fb..5b9a6d12f 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java @@ -32,6 +32,7 @@ import java.util.function.Function; import java.util.function.Supplier; import com.fasterxml.jackson.databind.ObjectMapper; +import reactor.core.publisher.Flux; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; @@ -39,9 +40,9 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor; -import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.cloud.function.registry.FunctionCatalog; @@ -53,13 +54,12 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.ResolvableType; import org.springframework.core.annotation.AnnotatedElementUtils; +import org.springframework.core.io.FileSystemResource; import org.springframework.core.type.StandardMethodMetadata; import org.springframework.stereotype.Component; import org.springframework.util.ClassUtils; import org.springframework.util.ReflectionUtils; -import reactor.core.publisher.Flux; - /** * @author Dave Syer * @author Mark Fisher @@ -209,7 +209,7 @@ public class ContextFunctionCatalogAutoConfiguration { else if (!isFluxSupplier(key, target)) { @SuppressWarnings({ "unchecked", "rawtypes" }) FluxSupplier value = new FluxSupplier(target); - return value; + return wrapSupplier(value, mapper, key); } else { return target; @@ -226,7 +226,7 @@ public class ContextFunctionCatalogAutoConfiguration { else if (!isFluxFunction(key, target)) { @SuppressWarnings({ "unchecked", "rawtypes" }) FluxFunction value = new FluxFunction(target); - return value; + return wrapFunction(value, mapper, key); } else { return target; @@ -242,7 +242,7 @@ public class ContextFunctionCatalogAutoConfiguration { else if (!isFluxConsumer(key, target)) { @SuppressWarnings({ "unchecked", "rawtypes" }) FluxConsumer value = new FluxConsumer(target); - return value; + return wrapConsumer(value, mapper, key); } else { return target; @@ -367,16 +367,38 @@ public class ContextFunctionCatalogAutoConfiguration { return wrapped; } - private Class findType(RootBeanDefinition definition, int index) { - StandardMethodMetadata source = (StandardMethodMetadata) definition - .getSource(); + private Class findType(AbstractBeanDefinition definition, int index) { + Object source = definition.getSource(); Type param; if (source instanceof StandardMethodMetadata) { ParameterizedType type; - type = (ParameterizedType) (source.getIntrospectedMethod() - .getGenericReturnType()); - type = (ParameterizedType) type.getActualTypeArguments()[index]; - param = type.getActualTypeArguments()[0]; + type = (ParameterizedType) ((StandardMethodMetadata) source).getIntrospectedMethod() + .getGenericReturnType(); + Type typeArgumentAtIndex = type.getActualTypeArguments()[index]; + if (typeArgumentAtIndex instanceof ParameterizedType) { + param = ((ParameterizedType) typeArgumentAtIndex).getActualTypeArguments()[0]; + } + else { + param = typeArgumentAtIndex; + } + } + else if (source instanceof FileSystemResource) { + try { + Type type = ClassUtils.forName(definition.getBeanClassName(), null); + if (type instanceof ParameterizedType) { + Type typeArgumentAtIndex = ((ParameterizedType)type).getActualTypeArguments()[index]; + if (typeArgumentAtIndex instanceof ParameterizedType) { + param = ((ParameterizedType) typeArgumentAtIndex).getActualTypeArguments()[0]; + } else { + param = typeArgumentAtIndex; + } + } + else { + param = type; + } + } catch (ClassNotFoundException e) { + throw new IllegalStateException("Cannot instrospect bean: " + definition, e); + } } else { ResolvableType resolvable = (ResolvableType) getField(definition, @@ -398,11 +420,11 @@ public class ContextFunctionCatalogAutoConfiguration { } private Class findType(String name) { - return findType((RootBeanDefinition) registry.getBeanDefinition(name), 0); + return findType((AbstractBeanDefinition) registry.getBeanDefinition(name), 0); } private Class findOutputType(String name) { - return findType((RootBeanDefinition) registry.getBeanDefinition(name), 1); + return findType((AbstractBeanDefinition) registry.getBeanDefinition(name), 1); } } diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxPojoStreamingConsumerTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxPojoStreamingConsumerTests.java index c790e0413..f4ef80798 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxPojoStreamingConsumerTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxPojoStreamingConsumerTests.java @@ -66,7 +66,7 @@ public class FluxPojoStreamingConsumerTests { @Bean public Consumer> sinkConsumer(final List sinkCollector) { - return foos -> foos.doOnNext(s -> sinkCollector.add(s)); + return foos -> foos.subscribe(s -> sinkCollector.add(s)); } } diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxStreamingConsumerTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxStreamingConsumerTests.java index 8393067f9..a774f354f 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxStreamingConsumerTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/FluxStreamingConsumerTests.java @@ -52,7 +52,7 @@ public class FluxStreamingConsumerTests { @Test public void test() throws Exception { - sink.input().send(MessageBuilder.withPayload(new Foo("foo")).build()); + sink.input().send(MessageBuilder.withPayload(new String("{\"name\":\"foo\"}")).build()); assertThat(sinkCollector).hasSize(1); } diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/PojoStreamingConsumerTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/PojoStreamingConsumerTests.java index b3574723f..1ed498001 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/PojoStreamingConsumerTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/consumer/PojoStreamingConsumerTests.java @@ -50,7 +50,7 @@ public class PojoStreamingConsumerTests { @Test public void test() throws Exception { - sink.input().send(MessageBuilder.withPayload(new Foo("foo")).build()); + sink.input().send(MessageBuilder.withPayload(new String("{\"name\":\"foo\"}")).build()); assertThat(sinkCollector).hasSize(1); }