diff --git a/docs/src/main/asciidoc/spring-cloud-function.adoc b/docs/src/main/asciidoc/spring-cloud-function.adoc index a43e4ebeb..2cb864b29 100644 --- a/docs/src/main/asciidoc/spring-cloud-function.adoc +++ b/docs/src/main/asciidoc/spring-cloud-function.adoc @@ -19,6 +19,53 @@ include::https://raw.githubusercontent.com/spring-cloud/spring-cloud-build/maste include::getting-started.adoc[] +== Function Catalog and Flexible Function Signatures + +One of the main features of Spring Cloud Function is to adapt and +support a range of type signatures for user-defined functions. So +users can supply a bean of type `Function`, for +instance, and the `FunctionCatalog` will wrap it into a +`Function,Flux>`. Users don't normally have to +care about the `FunctionCatalog` at all, but it is useful to know what +kind of functions are supported in user code. + +Generally speaking users can expect that if they write a function for +a plain old Java type (or primitive wrapper), then the function +catalog will wrap it to a `Flux` of the same type. If the user writes +a function using `Message` (from spring-messaging) it will receive and +transmit headers from any adapter that supports key-value metadata +(e.g. HTTP headers). Here are the details. + +|=== +| User Function | Catalog Registration | + +| `Function` | `Function, Flux>` | +| `Function,Message>` | `Function>, Flux>>` | +| `Function, Flux>` | `Function, Flux>` (pass through) | +| `Supplier` | `Supplier>` | +| `Supplier>` | `Supplier>` | +| `Consumer` | `Function, Mono>` | +| `Consumer>` | `Function>, Mono>` | +| `Consumer>` | `Consumer>` | + +Consumer is a little bit special because it has a `void` return type, +which implies blocking, at least potentially. Most likely you will not +need to write `Consumer>`, but if you do need to do that, +remember to subscribe to the input flux. If you declare a `Consumer` +of a non publisher type (which is normal), it will be converted to a +function that returns a publisher, so that it can be subscribed to in +a controlled way. + +A function catalog can contain a `Supplier` and a `Function` (or +`Consumer`) with the same name (like a GET and a POST to the same +resource). It can even contain a `Consumer>` with the same name +as a `Function`, but it cannot contain a `Consumer` and a +`Function` with the same name when `T` is not a `Publisher` +because the consumer would be converted to a `Function` and only one +of them can be registered. + +|=== + == Standalone Web Applications The `spring-cloud-function-web` module has autoconfiguration that diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializer.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializer.java index 323470ebf..1f5943657 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializer.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializer.java @@ -29,6 +29,7 @@ import java.util.jar.Manifest; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -49,7 +50,7 @@ public class SpringFunctionInitializer implements Closeable { private final Class configurationClass; - private Function, Flux> function; + private Function, Publisher> function; private Consumer> consumer; @@ -104,16 +105,20 @@ public class SpringFunctionInitializer implements Closeable { if (defaultName) { name = "consumer"; } - this.consumer = this.catalog.lookup(Consumer.class, name); - if (this.consumer == null) { - if (defaultName) { - name = "supplier"; + this.function = this.catalog.lookup(Function.class, name); + if (this.function == null) { + this.consumer = this.catalog.lookup(Consumer.class, name); + if (this.consumer == null) { + if (defaultName) { + name = "supplier"; + } + this.supplier = this.catalog.lookup(Supplier.class, name); } - this.supplier = this.catalog.lookup(Supplier.class, name); } } } this.context = context; + } private SpringApplicationBuilder springApplication() { @@ -143,7 +148,7 @@ public class SpringFunctionInitializer implements Closeable { protected Flux apply(Flux input) { if (this.function != null) { - return function.apply(input); + return Flux.from(function.apply(input)); } if (this.consumer != null) { this.consumer.accept(input); diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializerTests.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializerTests.java index 84be63958..8e4d209be 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializerTests.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializerTests.java @@ -38,21 +38,20 @@ import reactor.core.publisher.Flux; * */ public class SpringFunctionInitializerTests { - + private SpringFunctionInitializer initializer; @After public void after() { System.clearProperty("function.name"); - if (initializer!=null) { + if (initializer != null) { initializer.close(); } } @Test public void functionBean() { - initializer = new SpringFunctionInitializer( - FluxFunctionConfig.class); + initializer = new SpringFunctionInitializer(FluxFunctionConfig.class); initializer.initialize(); Flux result = initializer.apply(Flux.just(new Foo())); assertThat(result.blockFirst()).isInstanceOf(Bar.class); @@ -60,8 +59,7 @@ public class SpringFunctionInitializerTests { @Test public void functionCatalog() { - initializer = new SpringFunctionInitializer( - FunctionConfig.class); + initializer = new SpringFunctionInitializer(FunctionConfig.class); initializer.initialize(); Flux result = initializer.apply(Flux.just(new Foo())); assertThat(result.blockFirst()).isInstanceOf(Bar.class); @@ -69,8 +67,7 @@ public class SpringFunctionInitializerTests { @Test public void namedFunctionCatalog() { - initializer = new SpringFunctionInitializer( - NamedFunctionConfig.class); + initializer = new SpringFunctionInitializer(NamedFunctionConfig.class); System.setProperty("function.name", "other"); initializer.initialize(); Flux result = initializer.apply(Flux.just(new Foo())); @@ -79,8 +76,7 @@ public class SpringFunctionInitializerTests { @Test public void consumerCatalog() { - initializer = new SpringFunctionInitializer( - ConsumerConfig.class); + initializer = new SpringFunctionInitializer(ConsumerConfig.class); initializer.initialize(); Flux result = initializer.apply(Flux.just(new Foo())); assertThat(result.toStream().collect(Collectors.toList())).isEmpty(); @@ -88,8 +84,7 @@ public class SpringFunctionInitializerTests { @Test public void supplierCatalog() { - initializer = new SpringFunctionInitializer( - SupplierConfig.class); + initializer = new SpringFunctionInitializer(SupplierConfig.class); initializer.initialize(); Flux result = initializer.apply(Flux.empty()); assertThat(result.blockFirst()).isInstanceOf(Bar.class); diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index 6dc0a93ac..0f8bf69a0 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -405,23 +405,22 @@ public class ContextFunctionCatalogAutoConfiguration { findType(target); } Class type; + target = target(target, key); + registration.target(target); if (target instanceof Supplier) { type = Supplier.class; - registration.target(target((Supplier) target, key)); for (String name : registration.getNames()) { this.suppliers.put(name, registration.getTarget()); } } else if (target instanceof Consumer) { type = Consumer.class; - registration.target(target((Consumer) target, key)); for (String name : registration.getNames()) { this.consumers.put(name, registration.getTarget()); } } else if (target instanceof Function) { type = Function.class; - registration.target(target((Function) target, key)); for (String name : registration.getNames()) { this.functions.put(name, registration.getTarget()); } @@ -454,34 +453,34 @@ public class ContextFunctionCatalogAutoConfiguration { } @SuppressWarnings({ "unchecked", "rawtypes" }) - private T target(T target, String key) { + private Object target(Object target, String key) { boolean isolated = getClass().getClassLoader() != target.getClass() .getClassLoader(); if (target instanceof Supplier) { boolean flux = isFluxSupplier(key, (Supplier) target); if (isolated) { - target = (T) new IsolatedSupplier((Supplier) target); + target = new IsolatedSupplier((Supplier) target); } if (!flux) { - target = (T) new FluxSupplier((Supplier) target); + target = new FluxSupplier((Supplier) target); } } else if (target instanceof Function) { boolean flux = isFluxFunction(key, (Function) target); if (isolated) { - target = (T) new IsolatedFunction((Function) target); + target = new IsolatedFunction((Function) target); } if (!flux) { - target = (T) new FluxFunction((Function) target); + target = new FluxFunction((Function) target); } } else if (target instanceof Consumer) { boolean flux = isFluxConsumer(key, (Consumer) target); if (isolated) { - target = (T) new IsolatedConsumer((Consumer) target); + target = new IsolatedConsumer((Consumer) target); } if (!flux) { - target = (T) new FluxConsumer((Consumer) target); + target = new FluxConsumer((Consumer) target); } } return target; diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java index 2a5218f86..6bce9dbae 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java @@ -32,6 +32,7 @@ import org.springframework.cloud.function.context.config.ContextFunctionCatalogA import static org.assertj.core.api.Assertions.assertThat; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * @author Dave Syer @@ -88,7 +89,8 @@ public class BeanFactoryFunctionCatalogTests { public void composeFunction() { processor.register(new FunctionRegistration<>(new Foos()).names("foos")); processor.register(new FunctionRegistration<>(new Bars()).names("bars")); - Function, Flux> foos = processor.lookup(Function.class, "foos,bars"); + Function, Flux> foos = processor.lookup(Function.class, + "foos,bars"); assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("Hello 4"); } @@ -112,8 +114,9 @@ public class BeanFactoryFunctionCatalogTests { processor.register(new FunctionRegistration<>(new Foos()).names("foos")); Sink sink = new Sink(); processor.register(new FunctionRegistration<>(sink).names("sink")); - Consumer> foos = processor.lookup(Consumer.class, "foos,sink"); - foos.accept(Flux.just(2)); + Function, Mono> foos = processor.lookup(Function.class, + "foos,sink"); + foos.apply(Flux.just(2)).subscribe(); assertThat(sink.values).contains("4"); } @@ -121,8 +124,8 @@ public class BeanFactoryFunctionCatalogTests { public void composeUniqueConsumer() { Sink sink = new Sink(); processor.register(new FunctionRegistration<>(sink).names("sink")); - Consumer> foos = processor.lookup(Consumer.class, ""); - foos.accept(Flux.just("2")); + Function, Mono> foos = processor.lookup(Function.class, ""); + foos.apply(Flux.just("2")).subscribe(); assertThat(sink.values).contains("2"); } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfigurationTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfigurationTests.java index 437f18896..2d50283e7 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfigurationTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfigurationTests.java @@ -66,6 +66,7 @@ import org.springframework.util.StreamUtils; import static org.assertj.core.api.Assertions.assertThat; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * @author Dave Syer @@ -115,11 +116,11 @@ public class ContextFunctionCatalogAutoConfigurationTests { assertThat(context.getBean("foos")).isInstanceOf(Function.class); assertThat(catalog.>lookup(Function.class, "foos")) .isInstanceOf(Function.class); - assertThat(catalog.>lookup(Consumer.class, "foos")) - .isInstanceOf(Consumer.class); + assertThat(catalog.>lookup(Supplier.class, "foos")) + .isInstanceOf(Supplier.class); assertThat(inspector.getInputType(catalog.lookup(Function.class, "foos"))) .isEqualTo(String.class); - assertThat(inspector.getInputType(catalog.lookup(Consumer.class, "foos"))) + assertThat(inspector.getOutputType(catalog.lookup(Supplier.class, "foos"))) .isEqualTo(Foo.class); } @@ -185,13 +186,13 @@ public class ContextFunctionCatalogAutoConfigurationTests { @Test public void composedConsumer() { create(MultipleConfiguration.class); - assertThat(catalog.>lookup(Consumer.class, "foos,print")) - .isInstanceOf(Consumer.class); - assertThat(catalog.>lookup(Function.class, "foos,print")).isNull(); - assertThat(inspector.getInputType(catalog.lookup(Consumer.class, "foos,print"))) + assertThat(catalog.>lookup(Consumer.class, "foos,print")).isNull(); + assertThat(catalog.>lookup(Function.class, "foos,print")) + .isInstanceOf(Function.class); + assertThat(inspector.getInputType(catalog.lookup(Function.class, "foos,print"))) .isAssignableFrom(String.class); // The output type is the same as the output type of the last element in the chain - assertThat(inspector.getOutputType(catalog.lookup(Consumer.class, "foos,print"))) + assertThat(inspector.getOutputType(catalog.lookup(Function.class, "foos,print"))) .isAssignableFrom(Void.class); } @@ -391,8 +392,9 @@ public class ContextFunctionCatalogAutoConfigurationTests { public void simpleConsumer() { create(SimpleConfiguration.class); assertThat(context.getBean("consumer")).isInstanceOf(Consumer.class); - Consumer> consumer = catalog.lookup(Consumer.class, "consumer"); - consumer.accept(Flux.just("foo", "bar")); + Function, Mono> consumer = catalog.lookup(Function.class, + "consumer"); + consumer.apply(Flux.just("foo", "bar")).subscribe(); assertThat(context.getBean(SimpleConfiguration.class).list).hasSize(2); } @@ -464,9 +466,9 @@ public class ContextFunctionCatalogAutoConfigurationTests { + "::set", "spring.cloud.function.compile.foos.type=consumer", "spring.cloud.function.compile.foos.inputType=String"); - assertThat(catalog.>lookup(Consumer.class, "foos")) - .isInstanceOf(Consumer.class); - assertThat(inspector.getInputWrapper(catalog.lookup(Consumer.class, "foos"))) + assertThat(catalog.>lookup(Function.class, "foos")) + .isInstanceOf(Function.class); + assertThat(inspector.getInputWrapper(catalog.lookup(Function.class, "foos"))) .isEqualTo(String.class); @SuppressWarnings("unchecked") Consumer consumer = (Consumer) context.getBean("foos"); @@ -602,7 +604,6 @@ public class ContextFunctionCatalogAutoConfigurationTests { @EnableAutoConfiguration @Configuration protected static class AmbiguousConfiguration { - private List list = new ArrayList<>(); @Bean public Function foos() { @@ -611,8 +612,8 @@ public class ContextFunctionCatalogAutoConfigurationTests { @Bean @Qualifier("foos") - public Consumer consumer() { - return value -> list.add(value); + public Supplier supplier() { + return () -> new Foo("bar"); } } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionPostProcessorTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionPostProcessorTests.java index e76902b16..fcff567c0 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionPostProcessorTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionPostProcessorTests.java @@ -28,14 +28,15 @@ import org.junit.After; import org.junit.Test; import org.springframework.beans.BeanUtils; -import org.springframework.test.util.ReflectionTestUtils; -import org.springframework.util.ClassUtils; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.ContextFunctionRegistry; +import org.springframework.test.util.ReflectionTestUtils; +import org.springframework.util.ClassUtils; import static org.assertj.core.api.Assertions.assertThat; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * @author Dave Syer @@ -124,7 +125,8 @@ public class ContextFunctionPostProcessorTests { assertThat(foos.get().blockFirst()).isEqualTo("8"); assertThat(processor.getRegistration(foos).getNames()) .containsExactly("ints|foos"); - assertThat(processor.getRegistration(foos).getType().getOutputWrapper()).isEqualTo(Flux.class); + assertThat(processor.getRegistration(foos).getType().getOutputWrapper()) + .isEqualTo(Flux.class); } @Test @@ -157,9 +159,9 @@ public class ContextFunctionPostProcessorTests { Object target = create(Sink.class); processor.register(new FunctionRegistration<>(target).names("sink")); @SuppressWarnings("unchecked") - Consumer> sink = (Consumer>) processor - .lookupConsumer("sink"); - sink.accept(Flux.just("Hello")); + Function, Mono> sink = (Function, Mono>) processor + .lookupFunction("sink"); + sink.apply(Flux.just("Hello")).subscribe(); @SuppressWarnings("unchecked") List values = (List) ReflectionTestUtils.getField(target, "values"); diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java index 4970e3bbf..3add72f47 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java @@ -17,18 +17,21 @@ package org.springframework.cloud.function.core; import java.util.function.Consumer; +import java.util.function.Function; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** - * {@link Consumer} implementation that wraps a target Consumer so that the target's - * simple input type will be wrapped as a {@link Flux} instance. + * Wrapper for a {@link Consumer} implementation that converts a non-reactive consumer + * into a reactive function. * * @author Dave Syer * * @param input type of target consumer */ -public class FluxConsumer implements Consumer>, FluxWrapper> { +public class FluxConsumer + implements Function, Mono>, FluxWrapper> { private final Consumer consumer; @@ -42,7 +45,7 @@ public class FluxConsumer implements Consumer>, FluxWrapper input) { - input.subscribe(t -> consumer.accept(t)); + public Mono apply(Flux input) { + return input.doOnNext(consumer).then(); } } diff --git a/spring-cloud-function-samples/function-sample-compiler/src/test/java/com/example/SampleCompiledConsumerTests.java b/spring-cloud-function-samples/function-sample-compiler/src/test/java/com/example/SampleCompiledConsumerTests.java index 1fd4a958f..37dc3a411 100644 --- a/spring-cloud-function-samples/function-sample-compiler/src/test/java/com/example/SampleCompiledConsumerTests.java +++ b/spring-cloud-function-samples/function-sample-compiler/src/test/java/com/example/SampleCompiledConsumerTests.java @@ -44,8 +44,7 @@ public class SampleCompiledConsumerTests { @Test public void print() { assertThat(new TestRestTemplate().postForObject( - "http://localhost:" + port + "/test", "it works", String.class)) - .isEqualTo("it works"); + "http://localhost:" + port + "/test", "it works", String.class)).isNull(); assertEquals("it works", Reference.instance); } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/AbstractStreamListeningInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/AbstractStreamListeningInvoker.java index 61e4c9ae7..0daa61bc8 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/AbstractStreamListeningInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/AbstractStreamListeningInvoker.java @@ -24,6 +24,8 @@ import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; +import org.reactivestreams.Publisher; + import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.catalog.FunctionInspector; @@ -89,10 +91,11 @@ public abstract class AbstractStreamListeningInvoker protected Flux> function(String name, Flux> flux) { Function> function = functionCatalog.lookup(Function.class, name); return flux.publish(values -> { - Flux result = function + Publisher result = function .apply(values.map(message -> convertInput(function).apply(message))); if (this.functionInspector.isMessage(function)) { - result = result.map(message -> MessageUtils.unpack(function, message)); + result = Flux.from(result) + .map(message -> MessageUtils.unpack(function, message)); } Flux> aggregate = headers(values); return aggregate.withLatestFrom(result, diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java index 514bb7296..f523a83eb 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.java @@ -23,11 +23,9 @@ import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory; import org.springframework.cloud.stream.messaging.Processor; -import org.springframework.cloud.stream.reactive.FluxSender; import org.springframework.messaging.Message; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; /** * @author Mark Fisher @@ -43,10 +41,9 @@ public class StreamListeningFunctionInvoker extends AbstractStreamListeningInvok } @StreamListener - public Mono handle(@Input(Processor.INPUT) Flux> input, - @Output(Processor.OUTPUT) FluxSender output) { - return output.send( - input.groupBy(this::select).flatMap(group -> group.key().process(group))); + @Output(Processor.OUTPUT) + public Flux> handle(@Input(Processor.INPUT) Flux> input) { + return input.groupBy(this::select).flatMap(group -> group.key().process(group)); } } diff --git a/spring-cloud-function-task/src/main/java/org/springframework/cloud/function/task/TaskConfiguration.java b/spring-cloud-function-task/src/main/java/org/springframework/cloud/function/task/TaskConfiguration.java index 19ef8d0a8..b3e93a5bb 100644 --- a/spring-cloud-function-task/src/main/java/org/springframework/cloud/function/task/TaskConfiguration.java +++ b/spring-cloud-function-task/src/main/java/org/springframework/cloud/function/task/TaskConfiguration.java @@ -30,6 +30,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * @author Mark Fisher @@ -45,9 +46,11 @@ public class TaskConfiguration { @Bean public CommandLineRunner commandLineRunner(FunctionCatalog registry) { - final Supplier> supplier = registry.lookup(Supplier.class, properties.getSupplier()); - final Function, Flux> function = registry.lookup(Function.class, properties.getFunction()); - final Consumer> consumer = registry.lookup(Consumer.class, properties.getConsumer()); + final Supplier> supplier = registry.lookup(Supplier.class, + properties.getSupplier()); + final Function, Flux> function = registry + .lookup(Function.class, properties.getFunction()); + final Consumer> consumer = consumer(registry); CommandLineRunner runner = new CommandLineRunner() { @Override @@ -57,4 +60,15 @@ public class TaskConfiguration { }; return runner; } + + private Consumer> consumer(FunctionCatalog registry) { + Consumer> consumer = registry.lookup(Consumer.class, + properties.getConsumer()); + if (consumer != null) { + return consumer; + } + Function, Mono> function = registry.lookup(Function.class, + properties.getConsumer()); + return flux -> function.apply(flux).subscribe(); + } } diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java index ff701b3d2..cba4952b0 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java @@ -191,9 +191,9 @@ public class RestApplicationTests { ResponseEntity result = rest.exchange(RequestEntity .post(new URI("/bareUpdates")).contentType(MediaType.APPLICATION_JSON) .body("[\"one\",\"two\"]"), String.class); - assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); assertThat(test.list).hasSize(2); - assertThat(result.getBody()).isEqualTo("[\"one\",\"two\"]"); + assertThat(result.getBody()).isEqualTo("[]"); } @Test