From f3951cba661cb893c2a1475beb0d6ee420bb5bb1 Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Fri, 24 Nov 2017 13:21:08 +0900 Subject: [PATCH] Prefer explicit route to default --- ...ntextFunctionCatalogAutoConfiguration.java | 2 +- .../BeanFactoryFunctionCatalogTests.java | 7 ++++++ .../src/main/resources/application.properties | 1 + .../StreamListeningFunctionInvoker.java | 16 ++++++------- .../PojoStreamingExplicitEndpointTests.java | 23 ++++++++++++++++++- 5 files changed, 38 insertions(+), 11 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java index e748dbfef..e3011c88d 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java @@ -250,7 +250,7 @@ public class ContextFunctionCatalogAutoConfiguration { if (stages.length == 0 && source.size() == 1) { stages = new String[] { source.keySet().iterator().next() }; } - Object function = lookup(stages[0], source); + Object function = stages.length>0 ? lookup(stages[0], source) : null; if (function == null) { return null; } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/BeanFactoryFunctionCatalogTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/BeanFactoryFunctionCatalogTests.java index 6483836d7..db13e1498 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/BeanFactoryFunctionCatalogTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/BeanFactoryFunctionCatalogTests.java @@ -54,6 +54,13 @@ public class BeanFactoryFunctionCatalogTests { assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("4"); } + @Test + public void lookupNonExistentConsumerWithEmptyName() { + processor.register(new FunctionRegistration<>(new Foos()).names("foos")); + Consumer> foos = processor.lookupConsumer(""); + assertThat(foos).isNull(); + } + @Test public void composeFunction() { processor.register(new FunctionRegistration<>(new Foos()).names("foos")); diff --git a/spring-cloud-function-samples/function-sample/src/main/resources/application.properties b/spring-cloud-function-samples/function-sample/src/main/resources/application.properties index a579479d7..7cadfc1fb 100644 --- a/spring-cloud-function-samples/function-sample/src/main/resources/application.properties +++ b/spring-cloud-function-samples/function-sample/src/main/resources/application.properties @@ -1 +1,2 @@ +spring.cloud.function.stream.endpoint: uppercase spring.cloud.function.scan.packages: com.example.functions \ No newline at end of file diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java index e36a4a193..b9b7dafa2 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java @@ -131,16 +131,14 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto } private FluxMessageProcessor select(Message input) { - String name = defaultEndpoint; - if (name != null) { - name = stash(name); + String name = null; + if (input.getHeaders().containsKey(StreamConfigurationProperties.ROUTE_KEY)) { + String key = (String) input.getHeaders() + .get(StreamConfigurationProperties.ROUTE_KEY); + name = stash(key); } - if (name == null) { - if (input.getHeaders().containsKey(StreamConfigurationProperties.ROUTE_KEY)) { - String key = (String) input.getHeaders() - .get(StreamConfigurationProperties.ROUTE_KEY); - name = stash(key); - } + if (name==null && defaultEndpoint != null) { + name = stash(defaultEndpoint); } if (name == null) { Set names = new LinkedHashSet<>(functionCatalog.getFunctionNames()); diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingExplicitEndpointTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingExplicitEndpointTests.java index 5340cec94..cd21d175d 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingExplicitEndpointTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/mixed/PojoStreamingExplicitEndpointTests.java @@ -16,7 +16,10 @@ package org.springframework.cloud.function.stream.mixed; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -26,6 +29,7 @@ import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.function.stream.StreamConfigurationProperties; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.cloud.stream.test.binder.MessageCollector; import org.springframework.context.annotation.Bean; @@ -49,9 +53,12 @@ public class PojoStreamingExplicitEndpointTests { @Autowired MessageCollector messageCollector; + + @Autowired + StreamingFunctionApplication app; @Test - public void test() throws Exception { + public void testDefaultEndpoint() throws Exception { processor.input() .send(MessageBuilder.withPayload("{\"name\":\"hello\"}").build()); Message result = messageCollector.forChannel(processor.output()).poll(1000, @@ -59,8 +66,17 @@ public class PojoStreamingExplicitEndpointTests { assertThat(result.getPayload()).isInstanceOf(Foo.class); } + @Test + public void testRoutingBeatsDefaultEndpoint() throws Exception { + processor.input() + .send(MessageBuilder.withPayload("{\"name\":\"hello\"}").setHeader(StreamConfigurationProperties.ROUTE_KEY, "sink").build()); + assertThat(app.foos).hasSize(1); + } + @SpringBootApplication public static class StreamingFunctionApplication { + + private List foos = new ArrayList<>(); @Bean public Function uppercase() { @@ -72,6 +88,11 @@ public class PojoStreamingExplicitEndpointTests { return () -> new Foo("world"); } + @Bean + public Consumer sink() { + return foo -> foos.add(foo); + } + } protected static class Foo {