GH-208 Added support for composing Supplier and Consumer

This essentially returns a terminal Supplier - Supplier<Flux<Void>> which can no longer be composed with anything else
Resolves #208
This commit is contained in:
Oleg Zhurakousky
2018-10-14 19:29:23 -04:00
parent 666bc100e8
commit ec68f6453c
3 changed files with 52 additions and 13 deletions

View File

@@ -32,9 +32,6 @@ import java.util.stream.Stream;
import javax.annotation.PreDestroy;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanDefinition;
@@ -70,6 +67,11 @@ import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
* @author Mark Fisher
@@ -327,12 +329,21 @@ public class ContextFunctionCatalogAutoConfiguration {
@SuppressWarnings("unchecked")
private Object compose(Object a, Object b) {
if (a instanceof Supplier && b instanceof Function) {
Supplier<Flux<Object>> supplier = (Supplier<Flux<Object>>) a;
if (b instanceof FluxConsumer) {
throw new UnsupportedOperationException("Composing Supplier and Consumer is not supported at the moment");
if (supplier instanceof FluxSupplier) {
FluxConsumer<Object> fConsumer = ((FluxConsumer<Object>)b);
return (Supplier<Flux<Void>>) () -> supplier.get().compose(v -> fConsumer.apply(supplier.get()));
}
else {
throw new IllegalStateException("The provided supplier is terminal (i.e., already composed with Consumer) "
+ "therefore it can not be composed with another consumer");
}
}
else {
Function<Object, Object> function = (Function<Object, Object>) b;
return (Supplier<Object>) () -> function.apply(supplier.get());
}
Supplier<Object> supplier = (Supplier<Object>) a;
Function<Object, Object> function = (Function<Object, Object>) b;
return (Supplier<Object>) () -> function.apply(supplier.get());
}
else if (a instanceof Function && b instanceof Function) {
Function<Object, Object> function1 = (Function<Object, Object>) a;