Prefer explicit route to default

This commit is contained in:
Dave Syer
2017-11-24 13:21:08 +09:00
parent 0e21a30459
commit f3951cba66
5 changed files with 38 additions and 11 deletions

View File

@@ -131,16 +131,14 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
}
private FluxMessageProcessor select(Message<?> input) {
String name = defaultEndpoint;
if (name != null) {
name = stash(name);
String name = null;
if (input.getHeaders().containsKey(StreamConfigurationProperties.ROUTE_KEY)) {
String key = (String) input.getHeaders()
.get(StreamConfigurationProperties.ROUTE_KEY);
name = stash(key);
}
if (name == null) {
if (input.getHeaders().containsKey(StreamConfigurationProperties.ROUTE_KEY)) {
String key = (String) input.getHeaders()
.get(StreamConfigurationProperties.ROUTE_KEY);
name = stash(key);
}
if (name==null && defaultEndpoint != null) {
name = stash(defaultEndpoint);
}
if (name == null) {
Set<String> names = new LinkedHashSet<>(functionCatalog.getFunctionNames());

View File

@@ -16,7 +16,10 @@
package org.springframework.cloud.function.stream.mixed;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -26,6 +29,7 @@ import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.function.stream.StreamConfigurationProperties;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.context.annotation.Bean;
@@ -49,9 +53,12 @@ public class PojoStreamingExplicitEndpointTests {
@Autowired
MessageCollector messageCollector;
@Autowired
StreamingFunctionApplication app;
@Test
public void test() throws Exception {
public void testDefaultEndpoint() throws Exception {
processor.input()
.send(MessageBuilder.withPayload("{\"name\":\"hello\"}").build());
Message<?> result = messageCollector.forChannel(processor.output()).poll(1000,
@@ -59,8 +66,17 @@ public class PojoStreamingExplicitEndpointTests {
assertThat(result.getPayload()).isInstanceOf(Foo.class);
}
@Test
public void testRoutingBeatsDefaultEndpoint() throws Exception {
processor.input()
.send(MessageBuilder.withPayload("{\"name\":\"hello\"}").setHeader(StreamConfigurationProperties.ROUTE_KEY, "sink").build());
assertThat(app.foos).hasSize(1);
}
@SpringBootApplication
public static class StreamingFunctionApplication {
private List<Foo> foos = new ArrayList<>();
@Bean
public Function<Foo, Foo> uppercase() {
@@ -72,6 +88,11 @@ public class PojoStreamingExplicitEndpointTests {
return () -> new Foo("world");
}
@Bean
public Consumer<Foo> sink() {
return foo -> foos.add(foo);
}
}
protected static class Foo {