diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java index 5f1eab4eb..59bc8e86e 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java @@ -48,6 +48,8 @@ public class StreamListeningConsumerInvoker implements SmartInitializingSingleto private final String[] names; + private static final String NOENDPOINT = "__NOENDPOINT__"; + public StreamListeningConsumerInvoker(FunctionCatalog functionCatalog, FunctionInspector functionInspector, CompositeMessageConverterFactory converterFactory, String defaultEndpoint, @@ -79,14 +81,23 @@ public class StreamListeningConsumerInvoker implements SmartInitializingSingleto private String select(Message input) { String name = defaultEndpoint; if (name == null) { - for (String candidate : names) { - Class inputType = functionInspector.getInputType(candidate); - if (this.converter.fromMessage(input, inputType) != null) { - name = candidate; - break; + if (names.length == 1) { + name = names[0]; + } + else { + for (String candidate : names) { + Class inputType = functionInspector.getInputType(candidate); + Object value = this.converter.fromMessage(input, inputType); + if (value != null && inputType.isInstance(value)) { + name = candidate; + break; + } } } } + if (name == null) { + return NOENDPOINT; + } return name; } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java index eb9853ec1..2c1352c2b 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java @@ -50,6 +50,8 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto private final String[] names; + private static final String NOENDPOINT = "__NOENDPOINT__"; + public StreamListeningFunctionInvoker(FunctionCatalog functionCatalog, FunctionInspector functionInspector, CompositeMessageConverterFactory converterFactory, String defaultEndpoint, @@ -82,14 +84,23 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto private String select(Message input) { String name = defaultEndpoint; if (name == null) { - for (String candidate : names) { - Class inputType = functionInspector.getInputType(candidate); - if (this.converter.fromMessage(input, inputType) != null) { - name = candidate; - break; + if (names.length == 1) { + name = names[0]; + } + else { + for (String candidate : names) { + Class inputType = functionInspector.getInputType(candidate); + Object value = this.converter.fromMessage(input, inputType); + if (value != null && inputType.isInstance(value)) { + name = candidate; + break; + } } } } + if (name == null) { + return NOENDPOINT; + } return name; } diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionConversionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionConversionTests.java new file mode 100644 index 000000000..36a8e458a --- /dev/null +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionConversionTests.java @@ -0,0 +1,162 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream.function; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.converter.MessageConverterUtils; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.SerializationUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +import reactor.core.publisher.Flux; + +/** + * @author Marius Bogoevici + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = FluxPojoStreamingFunctionConversionTests.StreamingFunctionApplication.class) +public class FluxPojoStreamingFunctionConversionTests { + + @Autowired + Processor processor; + + @Autowired + MessageCollector messageCollector; + + @Test + public void foo() throws Exception { + processor.input().send(MessageBuilder + .withPayload(SerializationUtils.serialize(new Foo("foo"))) + .setHeader("contentType", MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT) + .build()); + Message result = messageCollector.forChannel(processor.output()).poll(1000, + TimeUnit.MILLISECONDS); + assertThat(result.getPayload()).isInstanceOf(Foo.class); + } + + @Test + public void bar() throws Exception { + processor.input().send(MessageBuilder + .withPayload(SerializationUtils.serialize(new Bar("foo"))) + .setHeader("contentType", MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT) + .build()); + Message result = messageCollector.forChannel(processor.output()).poll(1000, + TimeUnit.MILLISECONDS); + assertThat(result.getPayload()).isInstanceOf(Bar.class); + } + + @Test + public void skip() throws Exception { + processor.input().send(MessageBuilder + .withPayload(SerializationUtils.serialize(new Spam("foo"))) + .setHeader("contentType", MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT) + .build()); + Message result = messageCollector.forChannel(processor.output()).poll(100, + TimeUnit.MILLISECONDS); + assertThat(result).isNull(); + } + + @SpringBootApplication + public static class StreamingFunctionApplication { + + @Bean + public Function, Flux> uppercase() { + return foos -> foos.map(f -> new Foo(f.getName().toUpperCase())); + } + + @Bean + public Function, Flux> lowercase() { + return foos -> foos.map(f -> new Bar(f.getName().toUpperCase())); + } + + } + + @SuppressWarnings("serial") + protected static class Foo implements Serializable { + private String name; + + Foo() { + } + + public Foo(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } + + @SuppressWarnings("serial") + protected static class Bar implements Serializable { + private String name; + + Bar() { + } + + public Bar(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } + + @SuppressWarnings("serial") + protected static class Spam implements Serializable { + private String name; + + Spam() { + } + + public Spam(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +} diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionTests.java index b8ac7a7a7..18deda6e6 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionTests.java @@ -21,7 +21,6 @@ import java.util.function.Function; import org.junit.Test; import org.junit.runner.RunWith; -import reactor.core.publisher.Flux; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -35,6 +34,8 @@ import org.springframework.test.context.junit4.SpringRunner; import static org.assertj.core.api.Assertions.assertThat; +import reactor.core.publisher.Flux; + /** * @author Marius Bogoevici */ @@ -62,6 +63,7 @@ public class FluxPojoStreamingFunctionTests { public Function, Flux> uppercase() { return foos -> foos.map(f -> new Foo(f.getName().toUpperCase())); } + } protected static class Foo {