From e117cfd5bde7ddd211461782aa66c8df8a48cf42 Mon Sep 17 00:00:00 2001 From: Marius Bogoevici Date: Wed, 26 Apr 2017 15:04:32 -0400 Subject: [PATCH] Tentatively fix streaming issues * Assume that the Function returned by FunctionCatalog is already wrapped as Function,Flux> --- .../function/stream/StreamConfiguration.java | 2 +- .../StreamListeningFunctionInvoker.java | 16 +--- .../FluxPojoStreamingFunctionTests.java | 87 +++++++++++++++++++ .../function/FluxStreamingFunctionTests.java | 69 +++++++++++++++ 4 files changed, 159 insertions(+), 15 deletions(-) create mode 100644 spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionTests.java create mode 100644 spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxStreamingFunctionTests.java diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java index b2ca04ecc..59f535177 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java @@ -89,7 +89,7 @@ public class StreamConfiguration { public AbstractFunctionInvoker invoker(FunctionCatalog registry, FunctionInspector functionInspector, @Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) { String name = properties.getEndpoint(); - Function function = registry.lookupFunction(name); + Function, Flux> function = registry.lookupFunction(name); Assert.notNull(function, "no such function: " + name); return new StreamListeningFunctionInvoker(name, function, functionInspector, compositeMessageConverterFactory); diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java index 75b8c8411..5b4ddf93b 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java @@ -23,8 +23,6 @@ import reactor.core.publisher.Flux; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.cloud.function.context.FunctionInspector; import org.springframework.cloud.function.invoker.AbstractFunctionInvoker; -import org.springframework.cloud.function.support.FluxFunction; -import org.springframework.cloud.function.support.FunctionUtils; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.StreamListener; @@ -32,7 +30,6 @@ import org.springframework.cloud.stream.converter.CompositeMessageConverterFacto import org.springframework.cloud.stream.messaging.Processor; import org.springframework.messaging.Message; import org.springframework.messaging.converter.MessageConverter; -import org.springframework.util.Assert; /** * @author Mark Fisher @@ -51,9 +48,9 @@ public class StreamListeningFunctionInvoker extends AbstractFunctionInvoker inputType; - public StreamListeningFunctionInvoker(String name, Function function, FunctionInspector functionInspector, + public StreamListeningFunctionInvoker(String name, Function, Flux> function, FunctionInspector functionInspector, CompositeMessageConverterFactory converterFactory) { - super(wrapIfNecessary(function)); + super(function); this.name = name; this.functionInspector = functionInspector; this.converterFactory = converterFactory; @@ -71,15 +68,6 @@ public class StreamListeningFunctionInvoker extends AbstractFunctionInvoker, Flux> wrapIfNecessary(Function function) { - Assert.notNull(function, "Function must not be null"); - if (!FunctionUtils.isFluxFunction(function)) { - function = new FluxFunction(function); - } - return function; - } - private Function, Object> convertInput() { return m -> { if (this.inputType.isAssignableFrom(m.getPayload().getClass())) { diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionTests.java new file mode 100644 index 000000000..728a21342 --- /dev/null +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionTests.java @@ -0,0 +1,87 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream.function; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.junit.Test; +import org.junit.runner.RunWith; +import reactor.core.publisher.Flux; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Marius Bogoevici + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = FluxPojoStreamingFunctionTests.StreamingFunctionApplication.class, properties = { + "spring.cloud.stream.bindings.input.destination=data-in", + "spring.cloud.stream.bindings.output.destination=data-out", "spring.cloud.function.stream.endpoint=uppercase" }) +public class FluxPojoStreamingFunctionTests { + + @Autowired + Processor processor; + + @Autowired + MessageCollector messageCollector; + + @Test + public void test() throws Exception { + processor.input().send(MessageBuilder.withPayload(new String("{\"name\":\"foo\"}")).build()); + Message result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS); + assertThat(result.getPayload()).isInstanceOf(Foo.class); + } + + @SpringBootApplication + public static class StreamingFunctionApplication { + + @Bean + public Function, Flux> uppercase() { + return foos -> foos.map(f -> new Foo(f.getName().toUpperCase())); + } + } + + protected static class Foo { + private String name; + + Foo() { + } + + public Foo(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +} diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxStreamingFunctionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxStreamingFunctionTests.java new file mode 100644 index 000000000..902175248 --- /dev/null +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxStreamingFunctionTests.java @@ -0,0 +1,69 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream.function; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.junit.Test; +import org.junit.runner.RunWith; +import reactor.core.publisher.Flux; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Marius Bogoevici + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = FluxStreamingFunctionTests.StreamingFunctionApplication.class, properties = { + "spring.cloud.stream.bindings.input.destination=data-in", + "spring.cloud.stream.bindings.output.destination=data-out", + "spring.cloud.function.stream.endpoint=uppercase" }) +public class FluxStreamingFunctionTests { + + @Autowired + Processor processor; + + @Autowired + MessageCollector messageCollector; + + @Test + public void test() throws Exception { + processor.input().send(MessageBuilder.withPayload("foo").build()); + Message result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS); + assertThat(result.getPayload()).isEqualTo("FOO"); + } + + @SpringBootApplication + public static class StreamingFunctionApplication { + + @Bean + public Function, Flux> uppercase() { + return f-> f.map(s -> s.toUpperCase()); + } + } +}