Convert Consumer<Foo> to Function<Flux<Foo>,Mono<Void>>

This results in a better experience for users because the consumer
that they write is only applied to a Flux that is subscribed to
by the framework once. It gives better control over the flow of
foos, e.g. if some component wants to subscribe on a thread.
This commit is contained in:
Dave Syer
2018-03-26 10:06:13 +01:00
parent a1b624b28a
commit 5aeba1ea96
13 changed files with 144 additions and 76 deletions

View File

@@ -24,6 +24,8 @@ import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
@@ -89,10 +91,11 @@ public abstract class AbstractStreamListeningInvoker
protected Flux<Message<?>> function(String name, Flux<Message<?>> flux) {
Function<Object, Flux<?>> function = functionCatalog.lookup(Function.class, name);
return flux.publish(values -> {
Flux<?> result = function
Publisher<?> result = function
.apply(values.map(message -> convertInput(function).apply(message)));
if (this.functionInspector.isMessage(function)) {
result = result.map(message -> MessageUtils.unpack(function, message));
result = Flux.from(result)
.map(message -> MessageUtils.unpack(function, message));
}
Flux<Map<String, Object>> aggregate = headers(values);
return aggregate.withLatestFrom(result,

View File

@@ -23,11 +23,9 @@ import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.reactive.FluxSender;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* @author Mark Fisher
@@ -43,10 +41,9 @@ public class StreamListeningFunctionInvoker extends AbstractStreamListeningInvok
}
@StreamListener
public Mono<Void> handle(@Input(Processor.INPUT) Flux<Message<?>> input,
@Output(Processor.OUTPUT) FluxSender output) {
return output.send(
input.groupBy(this::select).flatMap(group -> group.key().process(group)));
@Output(Processor.OUTPUT)
public Flux<Message<?>> handle(@Input(Processor.INPUT) Flux<Message<?>> input) {
return input.groupBy(this::select).flatMap(group -> group.key().process(group));
}
}