GH-208 Added support for composing Supplier and Consumer
This essentially returns a terminal Supplier - Supplier<Flux<Void>> which can no longer be composed with anything else Resolves #208
This commit is contained in:
@@ -32,9 +32,6 @@ import java.util.stream.Stream;
|
|||||||
|
|
||||||
import javax.annotation.PreDestroy;
|
import javax.annotation.PreDestroy;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import com.google.gson.Gson;
|
|
||||||
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.beans.factory.config.BeanDefinition;
|
import org.springframework.beans.factory.config.BeanDefinition;
|
||||||
@@ -70,6 +67,11 @@ import org.springframework.stereotype.Component;
|
|||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
import org.springframework.util.StringUtils;
|
import org.springframework.util.StringUtils;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.gson.Gson;
|
||||||
|
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Dave Syer
|
* @author Dave Syer
|
||||||
* @author Mark Fisher
|
* @author Mark Fisher
|
||||||
@@ -327,12 +329,21 @@ public class ContextFunctionCatalogAutoConfiguration {
|
|||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private Object compose(Object a, Object b) {
|
private Object compose(Object a, Object b) {
|
||||||
if (a instanceof Supplier && b instanceof Function) {
|
if (a instanceof Supplier && b instanceof Function) {
|
||||||
|
Supplier<Flux<Object>> supplier = (Supplier<Flux<Object>>) a;
|
||||||
if (b instanceof FluxConsumer) {
|
if (b instanceof FluxConsumer) {
|
||||||
throw new UnsupportedOperationException("Composing Supplier and Consumer is not supported at the moment");
|
if (supplier instanceof FluxSupplier) {
|
||||||
|
FluxConsumer<Object> fConsumer = ((FluxConsumer<Object>)b);
|
||||||
|
return (Supplier<Flux<Void>>) () -> supplier.get().compose(v -> fConsumer.apply(supplier.get()));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw new IllegalStateException("The provided supplier is terminal (i.e., already composed with Consumer) "
|
||||||
|
+ "therefore it can not be composed with another consumer");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Function<Object, Object> function = (Function<Object, Object>) b;
|
||||||
|
return (Supplier<Object>) () -> function.apply(supplier.get());
|
||||||
}
|
}
|
||||||
Supplier<Object> supplier = (Supplier<Object>) a;
|
|
||||||
Function<Object, Object> function = (Function<Object, Object>) b;
|
|
||||||
return (Supplier<Object>) () -> function.apply(supplier.get());
|
|
||||||
}
|
}
|
||||||
else if (a instanceof Function && b instanceof Function) {
|
else if (a instanceof Function && b instanceof Function) {
|
||||||
Function<Object, Object> function1 = (Function<Object, Object>) a;
|
Function<Object, Object> function1 = (Function<Object, Object>) a;
|
||||||
|
|||||||
@@ -16,23 +16,25 @@
|
|||||||
|
|
||||||
package org.springframework.cloud.function.context.config;
|
package org.springframework.cloud.function.context.config;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.concurrent.atomic.AtomicStampedReference;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.reactivestreams.Publisher;
|
||||||
import org.springframework.cloud.function.context.FunctionRegistration;
|
import org.springframework.cloud.function.context.FunctionRegistration;
|
||||||
import org.springframework.cloud.function.context.FunctionType;
|
import org.springframework.cloud.function.context.FunctionType;
|
||||||
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.BeanFactoryFunctionCatalog;
|
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.BeanFactoryFunctionCatalog;
|
||||||
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.ContextFunctionRegistry;
|
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.ContextFunctionRegistry;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
|
||||||
|
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
@@ -153,6 +155,30 @@ public class BeanFactoryFunctionCatalogTests {
|
|||||||
assertThat(sink.values).contains("2");
|
assertThat(sink.values).contains("2");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void composeSupplierAndConsumer() {
|
||||||
|
AtomicReference<String> ref = new AtomicReference<String>();
|
||||||
|
Supplier<String> s = () -> "hello";
|
||||||
|
processor.register(new FunctionRegistration<>(s, "supplier"));
|
||||||
|
Consumer<String> c = x -> ref.set(x.toUpperCase());
|
||||||
|
processor.register(new FunctionRegistration<>(c, "consumer"));
|
||||||
|
Supplier<Flux<Void>> f = processor.lookup("supplier|consumer");
|
||||||
|
f.get().blockFirst();
|
||||||
|
assertThat(ref.get()).isEqualTo("HELLO");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected=IllegalStateException.class)
|
||||||
|
public void failComposeSupplierWithMultipleConsumers() {
|
||||||
|
AtomicReference<String> ref = new AtomicReference<String>();
|
||||||
|
Supplier<String> s = () -> "hello";
|
||||||
|
processor.register(new FunctionRegistration<>(s, "supplier"));
|
||||||
|
Consumer<String> c = x -> ref.set(x.toUpperCase());
|
||||||
|
processor.register(new FunctionRegistration<>(c, "consumer"));
|
||||||
|
Consumer<String> z = x -> ref.set(x.toUpperCase());
|
||||||
|
processor.register(new FunctionRegistration<>(z, "z"));
|
||||||
|
processor.lookup("supplier|consumer|z");
|
||||||
|
}
|
||||||
|
|
||||||
protected static class Source implements Supplier<Integer> {
|
protected static class Source implements Supplier<Integer> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -17,6 +17,7 @@
|
|||||||
package org.springframework.cloud.function.context.config;
|
package org.springframework.cloud.function.context.config;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
@@ -115,12 +116,13 @@ public class ContextFunctionPostProcessorTests {
|
|||||||
assertThat(processor.getRegistration(supplier).getNames()).containsExactly("supplier|function");
|
assertThat(processor.getRegistration(supplier).getNames()).containsExactly("supplier|function");
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO we should support it at some point since this is really a Runnable
|
@SuppressWarnings("unchecked")
|
||||||
@Test(expected=UnsupportedOperationException.class)
|
@Test
|
||||||
public void supplierAndConsumer() {
|
public void supplierAndConsumer() {
|
||||||
processor.register(new FunctionRegistration<Supplier<String>>(() -> "foo", "supplier"));
|
processor.register(new FunctionRegistration<Supplier<String>>(() -> "foo", "supplier"));
|
||||||
processor.register(new FunctionRegistration<Consumer<String>>(System.out::println, "consumer"));
|
processor.register(new FunctionRegistration<Consumer<String>>(System.out::println, "consumer"));
|
||||||
processor.lookupSupplier("supplier|consumer");
|
Supplier<Flux<Void>> supplier = (Supplier<Flux<Void>>) processor.lookupSupplier("supplier|consumer");
|
||||||
|
assertNull(supplier.get().blockFirst());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
Reference in New Issue
Block a user