From c728cd4c0102d0403c020249df5802abbf32ad0c Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Wed, 14 Feb 2018 14:09:18 +0000 Subject: [PATCH] Support for Function,...> --- README.adoc | 6 +++-- .../context/catalog/FunctionInspector.java | 4 ++- ...FunctionCatalogAutoConfigurationTests.java | 27 +++++++++++++++++-- .../function/core/FunctionFactoryUtils.java | 5 +++- .../core/FunctionFactoryUtilsTests.java | 14 +++++++++- .../function/web/flux/FunctionController.java | 2 +- 6 files changed, 50 insertions(+), 8 deletions(-) diff --git a/README.adoc b/README.adoc index 6c5905563..28a668033 100644 --- a/README.adoc +++ b/README.adoc @@ -111,8 +111,10 @@ Stream `Source`. Functions can be of `Flux` or `Flux` and Spring Cloud Function takes care of converting the data to and from the desired types, as long as it comes in as plain text or (in the case of the -POJO) JSON. TBD: support for `Flux>` and maybe plain -`Pojo` types (Fluxes implied and implemented by the framework). +POJO) JSON. Also works for `Flux>` and plain +`Pojo` types (where `Flux` is implied and implemented by the framework). +Additionally, you can replace `Flux` by `Publisher` in any function +definition and it should work that way too. Functions can be grouped together in a single application, or deployed one-per-jar. It's up to the developer to choose. An app with multiple diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionInspector.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionInspector.java index dbd975025..8107a3128 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionInspector.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionInspector.java @@ -19,6 +19,8 @@ package org.springframework.cloud.function.context.catalog; import java.lang.reflect.Type; import java.util.Optional; +import org.reactivestreams.Publisher; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -44,7 +46,7 @@ public interface FunctionInspector { // Maybe make this a default method? static boolean isWrapper(Type type) { - return Flux.class.equals(type) || Mono.class.equals(type) + return Publisher.class.equals(type) || Flux.class.equals(type) || Mono.class.equals(type) || Optional.class.equals(type); } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfigurationTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfigurationTests.java index c35c13472..6391abb44 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfigurationTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfigurationTests.java @@ -16,8 +16,6 @@ package org.springframework.cloud.function.context.config; -import static org.assertj.core.api.Assertions.assertThat; - import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; @@ -30,6 +28,7 @@ import java.util.stream.Collectors; import org.junit.After; import org.junit.Test; +import org.reactivestreams.Publisher; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Qualifier; @@ -63,6 +62,8 @@ import org.springframework.util.ClassUtils; import org.springframework.util.ReflectionUtils; import org.springframework.util.StreamUtils; +import static org.assertj.core.api.Assertions.assertThat; + import reactor.core.publisher.Flux; /** @@ -175,6 +176,18 @@ public class ContextFunctionCatalogAutoConfigurationTests { .isAssignableFrom(Flux.class); } + @Test + public void publisherMessageFunction() { + create(PublisherMessageConfiguration.class); + assertThat(context.getBean("function")).isInstanceOf(Function.class); + assertThat(catalog.lookupFunction("function")).isInstanceOf(Function.class); + assertThat(inspector.isMessage(catalog.lookupFunction("function"))).isTrue(); + assertThat(inspector.getInputType(catalog.lookupFunction("function"))) + .isAssignableFrom(String.class); + assertThat(inspector.getInputWrapper(catalog.lookupFunction("function"))) + .isAssignableFrom(Publisher.class); + } + @Test public void messageFunction() { create(MessageConfiguration.class); @@ -572,6 +585,16 @@ public class ContextFunctionCatalogAutoConfigurationTests { } } + @EnableAutoConfiguration + @Configuration + protected static class PublisherMessageConfiguration { + @Bean + public Function>, Publisher>> function() { + return flux -> Flux.from(flux).map(m -> MessageBuilder + .withPayload(m.getPayload().toUpperCase()).build()); + } + } + @EnableAutoConfiguration @Configuration protected static class MessageConfiguration { diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionFactoryUtils.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionFactoryUtils.java index b07e52dfe..253714c02 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionFactoryUtils.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionFactoryUtils.java @@ -27,6 +27,8 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.reactivestreams.Publisher; + import org.springframework.context.annotation.Bean; import org.springframework.util.ObjectUtils; import org.springframework.util.ReflectionUtils; @@ -54,6 +56,7 @@ import reactor.core.publisher.Flux; public abstract class FunctionFactoryUtils { private static final String FLUX_CLASS_NAME = Flux.class.getName(); + private static final String PUBLISHER_CLASS_NAME = Publisher.class.getName(); private FunctionFactoryUtils() { } @@ -130,6 +133,6 @@ public abstract class FunctionFactoryUtils { } private static boolean isFlux(int length, String... types){ - return !ObjectUtils.isEmpty(types) && types.length == length && Stream.of(types).allMatch(type -> type.startsWith(FLUX_CLASS_NAME)); + return !ObjectUtils.isEmpty(types) && types.length == length && Stream.of(types).allMatch(type -> type.startsWith(FLUX_CLASS_NAME) || type.startsWith(PUBLISHER_CLASS_NAME)); } } diff --git a/spring-cloud-function-core/src/test/java/org/springframework/cloud/function/core/FunctionFactoryUtilsTests.java b/spring-cloud-function-core/src/test/java/org/springframework/cloud/function/core/FunctionFactoryUtilsTests.java index 66e48f578..eb72b288e 100644 --- a/spring-cloud-function-core/src/test/java/org/springframework/cloud/function/core/FunctionFactoryUtilsTests.java +++ b/spring-cloud-function-core/src/test/java/org/springframework/cloud/function/core/FunctionFactoryUtilsTests.java @@ -22,8 +22,8 @@ import java.util.function.Function; import java.util.function.Supplier; import org.junit.Test; +import org.reactivestreams.Publisher; -import org.springframework.cloud.function.core.FunctionFactoryUtils; import org.springframework.util.ReflectionUtils; import static org.assertj.core.api.Assertions.assertThat; @@ -60,10 +60,22 @@ public class FunctionFactoryUtilsTests { assertThat(FunctionFactoryUtils.isFluxConsumer(method)).isFalse(); } + @Test + public void isReactiveFunction() { + Method method = ReflectionUtils.findMethod(FunctionFactoryUtilsTests.class, "reactiveFunction"); + assertThat(FunctionFactoryUtils.isFluxFunction(method)).isTrue(); + assertThat(FunctionFactoryUtils.isFluxSupplier(method)).isFalse(); + assertThat(FunctionFactoryUtils.isFluxConsumer(method)).isFalse(); + } + public Function, Flux> fluxFunction() { return foos -> foos.map(foo -> new Foo()); } + public Function, Publisher> reactiveFunction() { + return foos -> Flux.from(foos).map(foo -> new Foo()); + } + public Supplier> fluxSupplier() { return () -> Flux.just(new Foo()); } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java index b12816670..384a9ca0d 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java @@ -71,7 +71,7 @@ public class FunctionController { if (debug) { flux = flux.log(); } - Flux result = function.apply(flux); + Flux result = Flux.from(function.apply(flux)); if (logger.isDebugEnabled()) { logger.debug("Handled POST with function"); }