From b59b43ddc5e1642b2b9193ec83c923d32fb798cf Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Tue, 1 May 2018 11:46:29 -0400 Subject: [PATCH] Only expose Publisher via FunctionCatalog Flux.from() is cheap and can be used to marshal the inputs everywhere internally. With this change users ought to be able to register any function of any Publisher type. --- .../adapter/aws/SpringBootRequestHandler.java | 8 +++++--- .../adapter/aws/SpringBootStreamHandler.java | 8 +++++--- .../adapter/aws/SpringFunctionInitializer.java | 8 ++++---- .../aws/SpringFunctionInitializerTests.java | 10 +++++----- .../azure/AzureSpringBootRequestHandler.java | 8 +++++--- .../azure/AzureSpringFunctionInitializer.java | 9 ++++++--- .../openwhisk/OpenWhiskActionHandler.java | 17 ++++++++++++----- .../OpenWhiskFunctionInitializer.java | 9 +++++---- .../context/config/FluxWrapperDetector.java | 10 +++++----- .../config/AbstractStreamListeningInvoker.java | 4 ++-- .../SupplierInvokingMessageProducer.java | 8 +++++--- .../cloud/function/task/TaskConfiguration.java | 17 +++++++++-------- .../function/web/flux/FunctionController.java | 18 +++++++++--------- .../web/flux/FunctionHandlerMapping.java | 6 ++++-- 14 files changed, 81 insertions(+), 59 deletions(-) diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootRequestHandler.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootRequestHandler.java index 46516d373..1610ace3d 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootRequestHandler.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootRequestHandler.java @@ -23,6 +23,8 @@ import java.util.List; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; +import org.reactivestreams.Publisher; + import reactor.core.publisher.Flux; /** @@ -43,13 +45,13 @@ public class SpringBootRequestHandler extends SpringFunctionInitializer public Object handleRequest(E event, Context context) { initialize(); Object input = convertEvent(event); - Flux output = apply(extract(input)); + Publisher output = apply(extract(input)); return result(input, output); } - private Object result(Object input, Flux output) { + private Object result(Object input, Publisher output) { List result = new ArrayList<>(); - for (Object value : output.toIterable()) { + for (Object value : Flux.from(output).toIterable()) { result.add(convertOutput(value)); } if (isSingleValue(input) && result.size() == 1) { diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootStreamHandler.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootStreamHandler.java index b3510b486..c33f14158 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootStreamHandler.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootStreamHandler.java @@ -27,6 +27,8 @@ import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import com.fasterxml.jackson.databind.ObjectMapper; +import org.reactivestreams.Publisher; + import org.springframework.beans.factory.annotation.Autowired; import reactor.core.publisher.Flux; @@ -53,13 +55,13 @@ public class SpringBootStreamHandler extends SpringFunctionInitializer throws IOException { initialize(); Object value = convertStream(input); - Flux flux = apply(extract(value)); + Publisher flux = apply(extract(value)); mapper.writeValue(output, result(value, flux)); } - private Object result(Object input, Flux flux) { + private Object result(Object input, Publisher flux) { List result = new ArrayList<>(); - for (Object value : flux.toIterable()) { + for (Object value : Flux.from(flux).toIterable()) { result.add(value); } if (isSingleValue(input) && result.size()==1) { diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializer.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializer.java index 1f5943657..abf8fa309 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializer.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializer.java @@ -50,11 +50,11 @@ public class SpringFunctionInitializer implements Closeable { private final Class configurationClass; - private Function, Publisher> function; + private Function, Publisher> function; - private Consumer> consumer; + private Consumer> consumer; - private Supplier> supplier; + private Supplier> supplier; private AtomicBoolean initialized = new AtomicBoolean(); @@ -146,7 +146,7 @@ public class SpringFunctionInitializer implements Closeable { : (this.consumer != null ? this.consumer : this.supplier); } - protected Flux apply(Flux input) { + protected Publisher apply(Publisher input) { if (this.function != null) { return Flux.from(function.apply(input)); } diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializerTests.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializerTests.java index 8e4d209be..801d5c23c 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializerTests.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializerTests.java @@ -53,7 +53,7 @@ public class SpringFunctionInitializerTests { public void functionBean() { initializer = new SpringFunctionInitializer(FluxFunctionConfig.class); initializer.initialize(); - Flux result = initializer.apply(Flux.just(new Foo())); + Flux result = Flux.from(initializer.apply(Flux.just(new Foo()))); assertThat(result.blockFirst()).isInstanceOf(Bar.class); } @@ -61,7 +61,7 @@ public class SpringFunctionInitializerTests { public void functionCatalog() { initializer = new SpringFunctionInitializer(FunctionConfig.class); initializer.initialize(); - Flux result = initializer.apply(Flux.just(new Foo())); + Flux result = Flux.from(initializer.apply(Flux.just(new Foo()))); assertThat(result.blockFirst()).isInstanceOf(Bar.class); } @@ -70,7 +70,7 @@ public class SpringFunctionInitializerTests { initializer = new SpringFunctionInitializer(NamedFunctionConfig.class); System.setProperty("function.name", "other"); initializer.initialize(); - Flux result = initializer.apply(Flux.just(new Foo())); + Flux result = Flux.from(initializer.apply(Flux.just(new Foo()))); assertThat(result.blockFirst()).isInstanceOf(Bar.class); } @@ -78,7 +78,7 @@ public class SpringFunctionInitializerTests { public void consumerCatalog() { initializer = new SpringFunctionInitializer(ConsumerConfig.class); initializer.initialize(); - Flux result = initializer.apply(Flux.just(new Foo())); + Flux result = Flux.from(initializer.apply(Flux.just(new Foo()))); assertThat(result.toStream().collect(Collectors.toList())).isEmpty(); } @@ -86,7 +86,7 @@ public class SpringFunctionInitializerTests { public void supplierCatalog() { initializer = new SpringFunctionInitializer(SupplierConfig.class); initializer.initialize(); - Flux result = initializer.apply(Flux.empty()); + Flux result = Flux.from(initializer.apply(Flux.empty())); assertThat(result.blockFirst()).isInstanceOf(Bar.class); } diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/AzureSpringBootRequestHandler.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/AzureSpringBootRequestHandler.java index 0f206ab90..190ec0098 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/AzureSpringBootRequestHandler.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/AzureSpringBootRequestHandler.java @@ -22,6 +22,8 @@ import java.util.List; import com.microsoft.azure.serverless.functions.ExecutionContext; +import org.reactivestreams.Publisher; + import reactor.core.publisher.Flux; /** @@ -44,7 +46,7 @@ public class AzureSpringBootRequestHandler extends AzureSpringFunctionInit initialize(context); Object convertedEvent = convertEvent(foo); - Flux output = apply(extract(convertedEvent)); + Publisher output = apply(extract(convertedEvent)); return result(convertedEvent, output); } @@ -59,9 +61,9 @@ public class AzureSpringBootRequestHandler extends AzureSpringFunctionInit return Flux.just(input); } - private O result(Object input, Flux output) { + private O result(Object input, Publisher output) { List result = new ArrayList<>(); - for (Object value : output.toIterable()) { + for (Object value : Flux.from(output).toIterable()) { result.add(convertOutput(value)); } if (isSingleValue(input) && result.size() == 1) { diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/AzureSpringFunctionInitializer.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/AzureSpringFunctionInitializer.java index cc165a886..2cf268bc8 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/AzureSpringFunctionInitializer.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/AzureSpringFunctionInitializer.java @@ -28,7 +28,8 @@ import java.util.function.Function; import java.util.jar.Manifest; import com.microsoft.azure.serverless.functions.ExecutionContext; -import reactor.core.publisher.Flux; + +import org.reactivestreams.Publisher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -37,12 +38,14 @@ import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.util.ClassUtils; +import reactor.core.publisher.Flux; + /** * @author Soby Chacko */ public class AzureSpringFunctionInitializer implements Closeable { - private Function, Flux> function; + private Function, Publisher> function; private AtomicBoolean initialized = new AtomicBoolean(); @@ -110,7 +113,7 @@ public class AzureSpringFunctionInitializer implements Closeable { } } - protected Flux apply(Flux input) { + protected Publisher apply(Publisher input) { if (this.function != null) { return function.apply(input); } diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-openwhisk/src/main/java/org/springframework/cloud/function/adapter/openwhisk/OpenWhiskActionHandler.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-openwhisk/src/main/java/org/springframework/cloud/function/adapter/openwhisk/OpenWhiskActionHandler.java index 14a99b8eb..6f40f720b 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-openwhisk/src/main/java/org/springframework/cloud/function/adapter/openwhisk/OpenWhiskActionHandler.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-openwhisk/src/main/java/org/springframework/cloud/function/adapter/openwhisk/OpenWhiskActionHandler.java @@ -16,17 +16,24 @@ package org.springframework.cloud.function.adapter.openwhisk; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; -import reactor.core.publisher.Flux; -import java.util.*; +import reactor.core.publisher.Flux; /** * @author Mark Fisher @@ -55,15 +62,15 @@ public class OpenWhiskActionHandler extends OpenWhiskFunctionInitializer { Object input = convertEvent(request.getValue()); Object result = NO_INPUT_PROVIDED; if(input !=null ) { - Flux output = apply(extract(input)); + Publisher output = apply(extract(input)); result = result(input, output); } return serializeBody(result); } - private Object result(Object input, Flux output) { + private Object result(Object input, Publisher output) { List result = new ArrayList<>(); - for (Object value : output.toIterable()) { + for (Object value : Flux.from(output).toIterable()) { result.add(value); } if (isSingleValue(input) && result.size() == 1) { diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-openwhisk/src/main/java/org/springframework/cloud/function/adapter/openwhisk/OpenWhiskFunctionInitializer.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-openwhisk/src/main/java/org/springframework/cloud/function/adapter/openwhisk/OpenWhiskFunctionInitializer.java index fc5c03e10..a151c0298 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-openwhisk/src/main/java/org/springframework/cloud/function/adapter/openwhisk/OpenWhiskFunctionInitializer.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-openwhisk/src/main/java/org/springframework/cloud/function/adapter/openwhisk/OpenWhiskFunctionInitializer.java @@ -23,6 +23,7 @@ import java.util.function.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.function.context.FunctionCatalog; @@ -39,11 +40,11 @@ public class OpenWhiskFunctionInitializer { private static Log logger = LogFactory.getLog(OpenWhiskFunctionInitializer.class); - private Function, Flux> function; + private Function, Publisher> function; - private Consumer> consumer; + private Consumer> consumer; - private Supplier> supplier; + private Supplier> supplier; private AtomicBoolean initialized = new AtomicBoolean(); @@ -86,7 +87,7 @@ public class OpenWhiskFunctionInitializer { : (this.consumer != null ? this.consumer : this.supplier); } - protected Flux apply(Flux input) { + protected Publisher apply(Publisher input) { if (this.function != null) { return function.apply(input); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/FluxWrapperDetector.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/FluxWrapperDetector.java index f92c3daba..1291a1c12 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/FluxWrapperDetector.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/FluxWrapperDetector.java @@ -22,9 +22,6 @@ import org.reactivestreams.Publisher; import org.springframework.cloud.function.context.WrapperDetector; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - /** * @author Dave Syer * @@ -33,8 +30,11 @@ public class FluxWrapperDetector implements WrapperDetector { @Override public boolean isWrapper(Type type) { - return Publisher.class.equals(type) || Flux.class.equals(type) - || Mono.class.equals(type); + if (type instanceof Class) { + Class cls = (Class) type; + return Publisher.class.isAssignableFrom(cls); + } + return false; } } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/AbstractStreamListeningInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/AbstractStreamListeningInvoker.java index 0daa61bc8..a4741b7e0 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/AbstractStreamListeningInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/AbstractStreamListeningInvoker.java @@ -80,7 +80,7 @@ public abstract class AbstractStreamListeningInvoker } protected Mono consumer(String name, Flux> flux) { - Consumer consumer = functionCatalog.lookup(Consumer.class, name); + Consumer> consumer = functionCatalog.lookup(Consumer.class, name); flux = flux.publish().refCount(2); // The consumer will subscribe to the input flux, so we need to listen separately consumer.accept(flux.map(message -> convertInput(consumer).apply(message)) @@ -89,7 +89,7 @@ public abstract class AbstractStreamListeningInvoker } protected Flux> function(String name, Flux> flux) { - Function> function = functionCatalog.lookup(Function.class, name); + Function, Publisher> function = functionCatalog.lookup(Function.class, name); return flux.publish(values -> { Publisher result = function .apply(values.map(message -> convertInput(function).apply(message))); diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/SupplierInvokingMessageProducer.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/SupplierInvokingMessageProducer.java index 668238a64..54981d5c6 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/SupplierInvokingMessageProducer.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/config/SupplierInvokingMessageProducer.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.Set; import java.util.function.Supplier; +import org.reactivestreams.Publisher; + import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.cloud.stream.messaging.Source; @@ -93,12 +95,12 @@ public class SupplierInvokingMessageProducer extends MessageProducerSupport { if (!disposables.containsKey(name)) { synchronized (disposables) { if (!disposables.containsKey(name)) { - Supplier> supplier = functionCatalog.lookup(Supplier.class, + Supplier> supplier = functionCatalog.lookup(Supplier.class, name); if (supplier != null) { suppliers.add(name); disposables.put(name, - supplier.get().subscribeOn(Schedulers.elastic()) + Flux.from(supplier.get()).subscribeOn(Schedulers.elastic()) .subscribe(m -> send(name, m))); } } @@ -107,7 +109,7 @@ public class SupplierInvokingMessageProducer extends MessageProducerSupport { } private void send(String name, Object payload) { - Supplier> supplier = functionCatalog.lookup(Supplier.class, name); + Supplier> supplier = functionCatalog.lookup(Supplier.class, name); Message message = MessageUtils.unpack(supplier, payload); message = MessageBuilder.fromMessage(message) .setHeaderIfAbsent(StreamConfigurationProperties.ROUTE_KEY, name).build(); diff --git a/spring-cloud-function-task/src/main/java/org/springframework/cloud/function/task/TaskConfiguration.java b/spring-cloud-function-task/src/main/java/org/springframework/cloud/function/task/TaskConfiguration.java index b3e93a5bb..9dfa95a99 100644 --- a/spring-cloud-function-task/src/main/java/org/springframework/cloud/function/task/TaskConfiguration.java +++ b/spring-cloud-function-task/src/main/java/org/springframework/cloud/function/task/TaskConfiguration.java @@ -20,6 +20,8 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import org.reactivestreams.Publisher; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; @@ -29,7 +31,6 @@ import org.springframework.cloud.task.configuration.EnableTask; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -46,11 +47,11 @@ public class TaskConfiguration { @Bean public CommandLineRunner commandLineRunner(FunctionCatalog registry) { - final Supplier> supplier = registry.lookup(Supplier.class, + final Supplier> supplier = registry.lookup(Supplier.class, properties.getSupplier()); - final Function, Flux> function = registry + final Function, Publisher> function = registry .lookup(Function.class, properties.getFunction()); - final Consumer> consumer = consumer(registry); + final Consumer> consumer = consumer(registry); CommandLineRunner runner = new CommandLineRunner() { @Override @@ -61,14 +62,14 @@ public class TaskConfiguration { return runner; } - private Consumer> consumer(FunctionCatalog registry) { - Consumer> consumer = registry.lookup(Consumer.class, + private Consumer> consumer(FunctionCatalog registry) { + Consumer> consumer = registry.lookup(Consumer.class, properties.getConsumer()); if (consumer != null) { return consumer; } - Function, Mono> function = registry.lookup(Function.class, + Function, Publisher> function = registry.lookup(Function.class, properties.getConsumer()); - return flux -> function.apply(flux).subscribe(); + return flux -> Mono.from(function.apply(flux)).subscribe(); } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java index 9518c52ad..e760245f9 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java @@ -73,10 +73,10 @@ public class FunctionController { @RequestBody FluxRequest body) { @SuppressWarnings("unchecked") - Function, Flux> function = (Function, Flux>) request + Function, Publisher> function = (Function, Publisher>) request .getAttribute(WebRequestConstants.FUNCTION, WebRequest.SCOPE_REQUEST); @SuppressWarnings("unchecked") - Consumer> consumer = (Consumer>) request + Consumer> consumer = (Consumer>) request .getAttribute(WebRequestConstants.CONSUMER, WebRequest.SCOPE_REQUEST); Boolean single = (Boolean) request.getAttribute(WebRequestConstants.INPUT_SINGLE, WebRequest.SCOPE_REQUEST); @@ -116,7 +116,7 @@ public class FunctionController { } private Publisher response(WebRequest request, Object handler, Boolean single, - Flux result) { + Publisher result) { if (single != null && single && isOutputSingle(handler)) { request.setAttribute(WebRequestConstants.OUTPUT_SINGLE, true, @@ -146,10 +146,10 @@ public class FunctionController { @ResponseBody public Publisher get(WebRequest request) { @SuppressWarnings("unchecked") - Function, Flux> function = (Function, Flux>) request + Function, Publisher> function = (Function, Publisher>) request .getAttribute(WebRequestConstants.FUNCTION, WebRequest.SCOPE_REQUEST); @SuppressWarnings("unchecked") - Supplier> supplier = (Supplier>) request + Supplier> supplier = (Supplier>) request .getAttribute(WebRequestConstants.SUPPLIER, WebRequest.SCOPE_REQUEST); String argument = (String) request.getAttribute(WebRequestConstants.ARGUMENT, WebRequest.SCOPE_REQUEST); @@ -160,15 +160,15 @@ public class FunctionController { return response(request, supplier, true, supplier(supplier)); } - private Flux supplier(Supplier> supplier) { - Flux result = supplier.get(); + private Publisher supplier(Supplier> supplier) { + Publisher result = supplier.get(); if (logger.isDebugEnabled()) { logger.debug("Handled GET with supplier"); } - return debug ? result.log() : result; + return debug ? Flux.from(result).log() : result; } - private Mono value(Function, Flux> function, String value) { + private Mono value(Function, Publisher> function, String value) { Object input = converter.convert(function, value); Mono result = Mono.from(function.apply(Flux.just(input))); if (logger.isDebugEnabled()) { diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionHandlerMapping.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionHandlerMapping.java index 28c5d7a72..d2bf970f7 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionHandlerMapping.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionHandlerMapping.java @@ -22,6 +22,8 @@ import java.util.function.Supplier; import javax.servlet.http.HttpServletRequest; +import org.reactivestreams.Publisher; + import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -118,7 +120,7 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping return null; } path = path.startsWith("/") ? path.substring(1) : path; - Consumer consumer = functions.lookup(Consumer.class, path); + Consumer> consumer = functions.lookup(Consumer.class, path); if (consumer != null) { request.setAttribute(WebRequestConstants.CONSUMER, consumer); return consumer; @@ -136,7 +138,7 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping return null; } path = path.startsWith("/") ? path.substring(1) : path; - Supplier supplier = functions.lookup(Supplier.class, path); + Supplier> supplier = functions.lookup(Supplier.class, path); if (supplier != null) { request.setAttribute(WebRequestConstants.SUPPLIER, supplier); return supplier;