diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index 1e8321b87..321bbea0b 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -71,6 +71,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * @author Dave Syer @@ -333,10 +334,10 @@ public class ContextFunctionCatalogAutoConfiguration { if (b instanceof FluxConsumer) { if (supplier instanceof FluxSupplier) { FluxConsumer fConsumer = ((FluxConsumer)b); - return (Supplier>) () -> supplier.get().compose(v -> fConsumer.apply(supplier.get())); + return (Supplier>) () -> Mono.from(supplier.get().compose(v -> fConsumer.apply(supplier.get()))); } else { - throw new IllegalStateException("The provided supplier is terminal (i.e., already composed with Consumer) " + throw new IllegalStateException("The provided supplier is finite (i.e., already composed with Consumer) " + "therefore it can not be composed with another consumer"); } } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java index de5241060..708360c47 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java @@ -162,8 +162,8 @@ public class BeanFactoryFunctionCatalogTests { processor.register(new FunctionRegistration<>(s, "supplier")); Consumer c = x -> ref.set(x.toUpperCase()); processor.register(new FunctionRegistration<>(c, "consumer")); - Supplier> f = processor.lookup("supplier|consumer"); - f.get().blockFirst(); + Supplier> f = processor.lookup("supplier|consumer"); + ((Mono)f.get()).block(); assertThat(ref.get()).isEqualTo("HELLO"); } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionPostProcessorTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionPostProcessorTests.java index 703f4101e..ee4a68c8b 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionPostProcessorTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionPostProcessorTests.java @@ -121,8 +121,8 @@ public class ContextFunctionPostProcessorTests { public void supplierAndConsumer() { processor.register(new FunctionRegistration>(() -> "foo", "supplier")); processor.register(new FunctionRegistration>(System.out::println, "consumer")); - Supplier> supplier = (Supplier>) processor.lookupSupplier("supplier|consumer"); - assertNull(supplier.get().blockFirst()); + Supplier> supplier = (Supplier>) processor.lookupSupplier("supplier|consumer"); + assertNull(supplier.get().block()); } @Test