diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index 8082b17f9..1e8321b87 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -32,9 +32,6 @@ import java.util.stream.Stream; import javax.annotation.PreDestroy; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.gson.Gson; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.BeanDefinition; @@ -70,6 +67,11 @@ import org.springframework.stereotype.Component; import org.springframework.util.Assert; import org.springframework.util.StringUtils; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; + +import reactor.core.publisher.Flux; + /** * @author Dave Syer * @author Mark Fisher @@ -327,12 +329,21 @@ public class ContextFunctionCatalogAutoConfiguration { @SuppressWarnings("unchecked") private Object compose(Object a, Object b) { if (a instanceof Supplier && b instanceof Function) { + Supplier> supplier = (Supplier>) a; if (b instanceof FluxConsumer) { - throw new UnsupportedOperationException("Composing Supplier and Consumer is not supported at the moment"); + if (supplier instanceof FluxSupplier) { + FluxConsumer fConsumer = ((FluxConsumer)b); + return (Supplier>) () -> supplier.get().compose(v -> fConsumer.apply(supplier.get())); + } + else { + throw new IllegalStateException("The provided supplier is terminal (i.e., already composed with Consumer) " + + "therefore it can not be composed with another consumer"); + } + } + else { + Function function = (Function) b; + return (Supplier) () -> function.apply(supplier.get()); } - Supplier supplier = (Supplier) a; - Function function = (Function) b; - return (Supplier) () -> function.apply(supplier.get()); } else if (a instanceof Function && b instanceof Function) { Function function1 = (Function) a; diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java index 22029971e..95153c14e 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java @@ -16,23 +16,25 @@ package org.springframework.cloud.function.context.config; +import static org.assertj.core.api.Assertions.assertThat; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicStampedReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import org.junit.Test; - +import org.reactivestreams.Publisher; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionType; import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.BeanFactoryFunctionCatalog; import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.ContextFunctionRegistry; -import static org.assertj.core.api.Assertions.assertThat; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -153,6 +155,30 @@ public class BeanFactoryFunctionCatalogTests { assertThat(sink.values).contains("2"); } + @Test + public void composeSupplierAndConsumer() { + AtomicReference ref = new AtomicReference(); + Supplier s = () -> "hello"; + processor.register(new FunctionRegistration<>(s, "supplier")); + Consumer c = x -> ref.set(x.toUpperCase()); + processor.register(new FunctionRegistration<>(c, "consumer")); + Supplier> f = processor.lookup("supplier|consumer"); + f.get().blockFirst(); + assertThat(ref.get()).isEqualTo("HELLO"); + } + + @Test(expected=IllegalStateException.class) + public void failComposeSupplierWithMultipleConsumers() { + AtomicReference ref = new AtomicReference(); + Supplier s = () -> "hello"; + processor.register(new FunctionRegistration<>(s, "supplier")); + Consumer c = x -> ref.set(x.toUpperCase()); + processor.register(new FunctionRegistration<>(c, "consumer")); + Consumer z = x -> ref.set(x.toUpperCase()); + processor.register(new FunctionRegistration<>(z, "z")); + processor.lookup("supplier|consumer|z"); + } + protected static class Source implements Supplier { @Override diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionPostProcessorTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionPostProcessorTests.java index 548d167e8..703f4101e 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionPostProcessorTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionPostProcessorTests.java @@ -17,6 +17,7 @@ package org.springframework.cloud.function.context.config; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertNull; import java.io.File; import java.net.MalformedURLException; @@ -115,12 +116,13 @@ public class ContextFunctionPostProcessorTests { assertThat(processor.getRegistration(supplier).getNames()).containsExactly("supplier|function"); } - //TODO we should support it at some point since this is really a Runnable - @Test(expected=UnsupportedOperationException.class) + @SuppressWarnings("unchecked") + @Test public void supplierAndConsumer() { processor.register(new FunctionRegistration>(() -> "foo", "supplier")); processor.register(new FunctionRegistration>(System.out::println, "consumer")); - processor.lookupSupplier("supplier|consumer"); + Supplier> supplier = (Supplier>) processor.lookupSupplier("supplier|consumer"); + assertNull(supplier.get().blockFirst()); } @Test