Tentatively fix streaming issues

* Assume that the Function returned by FunctionCatalog
  is already wrapped as Function<Flux<?>,Flux<?>>
This commit is contained in:
Marius Bogoevici
2017-04-26 15:04:32 -04:00
committed by Dave Syer
parent f02f2eaf95
commit e117cfd5bd
4 changed files with 159 additions and 15 deletions

View File

@@ -89,7 +89,7 @@ public class StreamConfiguration {
public AbstractFunctionInvoker<?, ?> invoker(FunctionCatalog registry, FunctionInspector functionInspector,
@Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) {
String name = properties.getEndpoint();
Function<Object, Object> function = registry.lookupFunction(name);
Function<Flux<?>, Flux<?>> function = registry.lookupFunction(name);
Assert.notNull(function, "no such function: " + name);
return new StreamListeningFunctionInvoker(name, function, functionInspector,
compositeMessageConverterFactory);

View File

@@ -23,8 +23,6 @@ import reactor.core.publisher.Flux;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.cloud.function.context.FunctionInspector;
import org.springframework.cloud.function.invoker.AbstractFunctionInvoker;
import org.springframework.cloud.function.support.FluxFunction;
import org.springframework.cloud.function.support.FunctionUtils;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
@@ -32,7 +30,6 @@ import org.springframework.cloud.stream.converter.CompositeMessageConverterFacto
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.Assert;
/**
* @author Mark Fisher
@@ -51,9 +48,9 @@ public class StreamListeningFunctionInvoker extends AbstractFunctionInvoker<Flux
private Class<?> inputType;
public StreamListeningFunctionInvoker(String name, Function<?, ?> function, FunctionInspector functionInspector,
public StreamListeningFunctionInvoker(String name, Function<Flux<?>, Flux<?>> function, FunctionInspector functionInspector,
CompositeMessageConverterFactory converterFactory) {
super(wrapIfNecessary(function));
super(function);
this.name = name;
this.functionInspector = functionInspector;
this.converterFactory = converterFactory;
@@ -71,15 +68,6 @@ public class StreamListeningFunctionInvoker extends AbstractFunctionInvoker<Flux
return this.doInvoke(input.map(convertInput()));
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private static Function<Flux<?>, Flux<?>> wrapIfNecessary(Function function) {
Assert.notNull(function, "Function must not be null");
if (!FunctionUtils.isFluxFunction(function)) {
function = new FluxFunction(function);
}
return function;
}
private Function<Message<?>, Object> convertInput() {
return m -> {
if (this.inputType.isAssignableFrom(m.getPayload().getClass())) {

View File

@@ -0,0 +1,87 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.stream.function;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.junit.Test;
import org.junit.runner.RunWith;
import reactor.core.publisher.Flux;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Marius Bogoevici
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = FluxPojoStreamingFunctionTests.StreamingFunctionApplication.class, properties = {
"spring.cloud.stream.bindings.input.destination=data-in",
"spring.cloud.stream.bindings.output.destination=data-out", "spring.cloud.function.stream.endpoint=uppercase" })
public class FluxPojoStreamingFunctionTests {
@Autowired
Processor processor;
@Autowired
MessageCollector messageCollector;
@Test
public void test() throws Exception {
processor.input().send(MessageBuilder.withPayload(new String("{\"name\":\"foo\"}")).build());
Message<?> result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS);
assertThat(result.getPayload()).isInstanceOf(Foo.class);
}
@SpringBootApplication
public static class StreamingFunctionApplication {
@Bean
public Function<Flux<Foo>, Flux<Foo>> uppercase() {
return foos -> foos.map(f -> new Foo(f.getName().toUpperCase()));
}
}
protected static class Foo {
private String name;
Foo() {
}
public Foo(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}

View File

@@ -0,0 +1,69 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.stream.function;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.junit.Test;
import org.junit.runner.RunWith;
import reactor.core.publisher.Flux;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Marius Bogoevici
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = FluxStreamingFunctionTests.StreamingFunctionApplication.class, properties = {
"spring.cloud.stream.bindings.input.destination=data-in",
"spring.cloud.stream.bindings.output.destination=data-out",
"spring.cloud.function.stream.endpoint=uppercase" })
public class FluxStreamingFunctionTests {
@Autowired
Processor processor;
@Autowired
MessageCollector messageCollector;
@Test
public void test() throws Exception {
processor.input().send(MessageBuilder.withPayload("foo").build());
Message<?> result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS);
assertThat(result.getPayload()).isEqualTo("FOO");
}
@SpringBootApplication
public static class StreamingFunctionApplication {
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return f-> f.map(s -> s.toUpperCase());
}
}
}