Only expose Publisher via FunctionCatalog

Flux.from() is cheap and can be used to marshal the inputs everywhere
internally. With this change users ought to be able to register any
function of any Publisher type.
This commit is contained in:
Dave Syer
2018-05-01 11:46:29 -04:00
parent fb04324ac9
commit b59b43ddc5
14 changed files with 81 additions and 59 deletions

View File

@@ -80,7 +80,7 @@ public abstract class AbstractStreamListeningInvoker
}
protected Mono<Void> consumer(String name, Flux<Message<?>> flux) {
Consumer<Object> consumer = functionCatalog.lookup(Consumer.class, name);
Consumer<Publisher<?>> consumer = functionCatalog.lookup(Consumer.class, name);
flux = flux.publish().refCount(2);
// The consumer will subscribe to the input flux, so we need to listen separately
consumer.accept(flux.map(message -> convertInput(consumer).apply(message))
@@ -89,7 +89,7 @@ public abstract class AbstractStreamListeningInvoker
}
protected Flux<Message<?>> function(String name, Flux<Message<?>> flux) {
Function<Object, Flux<?>> function = functionCatalog.lookup(Function.class, name);
Function<Publisher<?>, Publisher<?>> function = functionCatalog.lookup(Function.class, name);
return flux.publish(values -> {
Publisher<?> result = function
.apply(values.map(message -> convertInput(function).apply(message)));

View File

@@ -22,6 +22,8 @@ import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.stream.messaging.Source;
@@ -93,12 +95,12 @@ public class SupplierInvokingMessageProducer<T> extends MessageProducerSupport {
if (!disposables.containsKey(name)) {
synchronized (disposables) {
if (!disposables.containsKey(name)) {
Supplier<Flux<?>> supplier = functionCatalog.lookup(Supplier.class,
Supplier<Publisher<?>> supplier = functionCatalog.lookup(Supplier.class,
name);
if (supplier != null) {
suppliers.add(name);
disposables.put(name,
supplier.get().subscribeOn(Schedulers.elastic())
Flux.from(supplier.get()).subscribeOn(Schedulers.elastic())
.subscribe(m -> send(name, m)));
}
}
@@ -107,7 +109,7 @@ public class SupplierInvokingMessageProducer<T> extends MessageProducerSupport {
}
private void send(String name, Object payload) {
Supplier<Flux<?>> supplier = functionCatalog.lookup(Supplier.class, name);
Supplier<Publisher<?>> supplier = functionCatalog.lookup(Supplier.class, name);
Message<?> message = MessageUtils.unpack(supplier, payload);
message = MessageBuilder.fromMessage(message)
.setHeaderIfAbsent(StreamConfigurationProperties.ROUTE_KEY, name).build();