Share unrouted messages amongst consumers and functions

This commit is contained in:
Dave Syer
2018-03-20 09:52:31 +00:00
parent 0be5f14766
commit 9012143093
5 changed files with 83 additions and 50 deletions

View File

@@ -62,7 +62,8 @@ public class StreamAutoConfiguration {
@Bean
public SupplierInvokingMessageProducer<Object> supplierInvoker(
FunctionCatalog registry) {
return new SupplierInvokingMessageProducer<Object>(registry, properties.getSource().getName());
return new SupplierInvokingMessageProducer<Object>(registry,
properties.getSource().getName());
}
}
@@ -80,7 +81,8 @@ public class StreamAutoConfiguration {
FunctionInspector functionInspector,
@Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) {
return new StreamListeningFunctionInvoker(registry, functionInspector,
compositeMessageConverterFactory, properties.getDefaultRoute());
compositeMessageConverterFactory, properties.getDefaultRoute(),
properties.isShared());
}
}
@@ -91,7 +93,7 @@ public class StreamAutoConfiguration {
@Autowired
private StreamConfigurationProperties properties;
public SinkConfiguration() {
}
@@ -100,7 +102,8 @@ public class StreamAutoConfiguration {
FunctionInspector functionInspector,
@Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) {
return new StreamListeningConsumerInvoker(registry, functionInspector,
compositeMessageConverterFactory, properties.getSink().getName());
compositeMessageConverterFactory, properties.getSink().getName(),
properties.isShared());
}
}
@@ -125,7 +128,7 @@ public class StreamAutoConfiguration {
private static class SinkOnlyCondition extends SpringBootCondition {
private SourceAndSinkCondition processor = new SourceAndSinkCondition();
private SinkCondition sink = new SinkCondition();
@Override
@@ -137,7 +140,8 @@ public class StreamAutoConfiguration {
if (sink.matches(context, metadata)) {
return ConditionOutcome.match("Sink is explicitly enabled");
}
return ConditionOutcome.noMatch("Sink is not enabled and not available through Processor");
return ConditionOutcome
.noMatch("Sink is not enabled and not available through Processor");
}
}

View File

@@ -30,6 +30,8 @@ public class StreamConfigurationProperties {
private Processor processor = new Processor();
private boolean shared;
public static final String ROUTE_KEY = "stream_routekey";
public Sink getSink() {
@@ -44,6 +46,14 @@ public class StreamConfigurationProperties {
return this.processor;
}
public boolean isShared() {
return this.shared;
}
public void setShared(boolean shared) {
this.shared = shared;
}
public static class Sink {
/**
@@ -72,6 +82,7 @@ public class StreamConfigurationProperties {
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
}
public static class Source {
@@ -149,10 +160,11 @@ public class StreamConfigurationProperties {
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
}
public String getDefaultRoute() {
return processor.getName()!=null ? processor.getName() : sink.getName();
return processor.getName() != null ? processor.getName() : sink.getName();
}
}

View File

@@ -37,6 +37,7 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* @author Dave Syer
@@ -55,19 +56,21 @@ public class StreamListeningConsumerInvoker implements SmartInitializingSingleto
private final Map<String, FluxMessageProcessor> processors = new HashMap<>();
private int count = -1;
private static final FluxMessageProcessor NOENDPOINT = flux -> Flux.empty();
private static final FluxMessageProcessor NOENDPOINT = flux -> Mono.empty();
private static final Object UNCONVERTED = new Object();
private boolean share;
public StreamListeningConsumerInvoker(FunctionCatalog functionCatalog,
FunctionInspector functionInspector,
CompositeMessageConverterFactory converterFactory, String defaultRoute) {
CompositeMessageConverterFactory converterFactory, String defaultRoute,
boolean share) {
this.functionCatalog = functionCatalog;
this.functionInspector = functionInspector;
this.converterFactory = converterFactory;
this.defaultRoute = defaultRoute;
this.share = share;
}
@Override
@@ -77,32 +80,35 @@ public class StreamListeningConsumerInvoker implements SmartInitializingSingleto
@StreamListener
public void handle(@Input(Processor.INPUT) Flux<Message<?>> input) {
input.groupBy(this::select).flatMap(group -> group.key().process(group)).subscribe();
input.groupBy(this::select).flatMap(group -> group.key().process(group))
.subscribe();
}
private Flux<Message<?>> consumer(String name, Flux<Message<?>> flux) {
private Mono<Void> consumer(String name, Flux<Message<?>> flux) {
Consumer<Object> consumer = functionCatalog.lookup(Consumer.class, name);
consumer.accept(flux.map(message -> convertInput(consumer).apply(message))
.filter(transformed -> transformed != UNCONVERTED));
return Flux.empty();
return Mono.empty();
}
private Flux<Message<?>> balance(List<String> names, Flux<Message<?>> flux) {
private Mono<Void> balance(List<String> names, Flux<Message<?>> flux) {
if (names.isEmpty()) {
return Flux.empty();
return Mono.empty();
}
String name = choose(names);
if (functionCatalog.lookup(Consumer.class, name) != null) {
return consumer(name, flux);
Flux<?> result = Flux.empty();
if (names.size() > 1) {
if (this.share) {
flux = flux.share();
}
else {
return Mono.error(new IllegalStateException(
"Multiple matches and share disabled: " + names));
}
}
return Flux.empty();
}
private synchronized String choose(List<String> names) {
if (++count >= names.size() || count < 0) {
count = 0;
for (String name : names) {
result = result.zipWith(consumer(name, flux));
}
return names.get(count);
return result.then();
}
private FluxMessageProcessor select(Message<?> input) {
@@ -188,7 +194,7 @@ public class StreamListeningConsumerInvoker implements SmartInitializingSingleto
}
interface FluxMessageProcessor {
Flux<Message<?>> process(Flux<Message<?>> flux);
Mono<Void> process(Flux<Message<?>> flux);
}
}

View File

@@ -60,19 +60,21 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
private final Map<String, FluxMessageProcessor> processors = new HashMap<>();
private int count = -1;
private static final FluxMessageProcessor NOENDPOINT = flux -> Flux.empty();
private static final Object UNCONVERTED = new Object();
private boolean share;
public StreamListeningFunctionInvoker(FunctionCatalog functionCatalog,
FunctionInspector functionInspector,
CompositeMessageConverterFactory converterFactory, String defaultRoute) {
CompositeMessageConverterFactory converterFactory, String defaultRoute,
boolean share) {
this.functionCatalog = functionCatalog;
this.functionInspector = functionInspector;
this.converterFactory = converterFactory;
this.defaultRoute = defaultRoute;
this.share = share;
}
@Override
@@ -96,7 +98,8 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
result = result.map(message -> MessageUtils.unpack(function, message));
}
Flux<Map<String, Object>> aggregate = headers(values);
return result.withLatestFrom(aggregate, (p, m) -> message(p, m));
return aggregate.withLatestFrom(result,
(map, payload) -> message(map, payload));
});
}
@@ -104,10 +107,8 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
return flux.map(message -> message.getHeaders());
}
private Message<?> message(Object result, Map<String, Object> headers) {
private Message<?> message(Map<String, Object> headers, Object result) {
return result instanceof Message
// TODO: why do we have to do this? The headers should have come with the
// result.
? MessageBuilder.fromMessage((Message<?>) result)
.copyHeadersIfAbsent(headers).build()
: MessageBuilder.withPayload(result).copyHeadersIfAbsent(headers).build();
@@ -115,27 +116,37 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
private Flux<Message<?>> consumer(String name, Flux<Message<?>> flux) {
Consumer<Object> consumer = functionCatalog.lookup(Consumer.class, name);
flux = flux.publish().refCount(2);
// The consumer will subscribe to the input flux, so we need to listen separately
consumer.accept(flux.map(message -> convertInput(consumer).apply(message))
.filter(transformed -> transformed != UNCONVERTED));
return Flux.empty();
return flux.ignoreElements().flux();
}
private Flux<Message<?>> balance(List<String> names, Flux<Message<?>> flux) {
if (names.isEmpty()) {
return Flux.empty();
}
String name = choose(names);
if (functionCatalog.lookup(Consumer.class, name) != null) {
return consumer(name, flux);
flux = flux.hide();
Flux<Message<?>> result = Flux.empty();
if (names.size() > 1) {
if (this.share) {
flux = flux.publish().refCount(names.size());
}
else {
return Flux.error(new IllegalStateException(
"Multiple matches and share disabled: " + names));
}
}
return function(name, flux);
}
private synchronized String choose(List<String> names) {
if (++count >= names.size() || count < 0) {
count = 0;
for (String name : names) {
if (functionCatalog.lookup(Consumer.class, name) != null) {
result = result.mergeWith(consumer(name, flux));
}
else {
result = result.mergeWith(function(name, flux));
}
}
return names.get(count);
return result;
}
private FluxMessageProcessor select(Message<?> input) {
@@ -149,7 +160,8 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
name = stash(defaultRoute);
}
if (name == null) {
Set<String> names = new LinkedHashSet<>(functionCatalog.getNames(Function.class));
Set<String> names = new LinkedHashSet<>(
functionCatalog.getNames(Function.class));
names.addAll(functionCatalog.getNames(Consumer.class));
List<String> matches = new ArrayList<>();
if (names.size() == 1) {
@@ -175,7 +187,6 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
name = stash(matches.iterator().next());
}
else {
// TODO: do we really want this? Or maybe warn that it is happening?
return flux -> balance(matches, flux);
}
}

View File

@@ -43,7 +43,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Marius Bogoevici
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = PojoStreamingMixedTests.StreamingFunctionApplication.class)
@SpringBootTest(classes = PojoStreamingMixedTests.StreamingFunctionApplication.class, properties = "spring.cloud.function.stream.shared=true")
public class PojoStreamingMixedTests {
@Autowired
@@ -69,8 +69,8 @@ public class PojoStreamingMixedTests {
Message<?> result = messageCollector.forChannel(processor.output()).poll(1000,
TimeUnit.MILLISECONDS);
assertThat(result.getPayload()).isInstanceOf(Foo.class);
// 2 subscribers to the same channel so input messages are applied as round robin
assertThat(collector).hasSize(1);
// 2 subscribers to the same channel but input messages are sent to all
assertThat(collector).hasSize(2);
}
@Test