Add conversion support for Stream

This commit is contained in:
Marius Bogoevici
2017-04-24 16:49:40 -04:00
committed by Dave Syer
parent 4686e450b1
commit 4a1972dcf1
5 changed files with 194 additions and 31 deletions

View File

@@ -31,10 +31,12 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.function.context.FunctionInspector;
import org.springframework.cloud.function.invoker.AbstractFunctionInvoker;
import org.springframework.cloud.function.registry.FunctionCatalog;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
@@ -42,6 +44,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.ConfigurationCondition;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
@@ -50,6 +53,7 @@ import reactor.core.publisher.Flux;
/**
* @author Mark Fisher
* @author Marius Bogoevici
*/
@EnableConfigurationProperties(StreamConfigurationProperties.class)
@ConditionalOnClass({ Binder.class, AbstractFunctionInvoker.class })
@@ -82,11 +86,13 @@ public class StreamConfiguration {
@Bean
@ConditionalOnProperty("spring.cloud.stream.bindings.input.destination")
public AbstractFunctionInvoker<?, ?> invoker(FunctionCatalog registry) {
public AbstractFunctionInvoker<?, ?> invoker(FunctionCatalog registry, FunctionInspector functionInspector,
@Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) {
String name = properties.getEndpoint();
Function<Object, Object> function = registry.lookupFunction(name);
Assert.notNull(function, "no such function: " + name);
return new StreamListeningFunctionInvoker(function);
return new StreamListeningFunctionInvoker(name, function, functionInspector,
compositeMessageConverterFactory);
}
}
@@ -99,10 +105,12 @@ public class StreamConfiguration {
@Bean
@ConditionalOnProperty("spring.cloud.stream.bindings.input.destination")
public StreamListeningConsumerInvoker<Object> invoker(FunctionCatalog registry) {
public StreamListeningConsumerInvoker invoker(FunctionCatalog registry, FunctionInspector functionInspector,
@Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) {
String name = properties.getEndpoint();
Consumer<Flux<Object>> consumer = registry.lookupConsumer(name);
return new StreamListeningConsumerInvoker<Object>(consumer);
Consumer<Flux<?>> consumer = registry.lookupConsumer(name);
return new StreamListeningConsumerInvoker(name, consumer, functionInspector,
compositeMessageConverterFactory);
}
}
@@ -137,10 +145,8 @@ public class StreamConfiguration {
}
@Override
public ConditionOutcome getMatchOutcome(ConditionContext context,
AnnotatedTypeMetadata metadata) {
String functionName = context.getEnvironment()
.getProperty("spring.cloud.function.stream.endpoint");
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) {
String functionName = context.getEnvironment().getProperty("spring.cloud.function.stream.endpoint");
if (!StringUtils.hasText(functionName)) {
return ConditionOutcome.noMatch("no endpoint function name available");
}
@@ -151,11 +157,9 @@ public class StreamConfiguration {
}
Class<?> beanType = context.getBeanFactory().getType(functionName);
if (type.isAssignableFrom(beanType)) {
return ConditionOutcome
.match(String.format("bean '%s' is a %s", functionName, type));
return ConditionOutcome.match(String.format("bean '%s' is a %s", functionName, type));
}
return ConditionOutcome
.noMatch(String.format("bean '%s' is not a %s", functionName, type));
return ConditionOutcome.noMatch(String.format("bean '%s' is not a %s", functionName, type));
}
@Override

View File

@@ -17,26 +17,64 @@
package org.springframework.cloud.function.stream;
import java.util.function.Consumer;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.cloud.function.context.FunctionInspector;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
/**
* @author Mark Fisher
* @author Marius Bogoevici
*/
public class StreamListeningConsumerInvoker<T> {
public class StreamListeningConsumerInvoker implements SmartInitializingSingleton {
private final Consumer<Flux<T>> consumer;
private final Consumer<Flux<?>> consumer;
public StreamListeningConsumerInvoker(Consumer<Flux<T>> consumer) {
private final String name;
private final FunctionInspector functionInspector;
private final CompositeMessageConverterFactory converterFactory;
private MessageConverter converter;
private Class<?> inputType;
public StreamListeningConsumerInvoker(String name, Consumer<Flux<?>> consumer, FunctionInspector functionInspector,
CompositeMessageConverterFactory converterFactory) {
this.consumer = consumer;
this.name = name;
this.functionInspector = functionInspector;
this.converterFactory = converterFactory;
}
@Override
public void afterSingletonsInstantiated() {
this.converter = this.converterFactory.getMessageConverterForAllRegistered();
this.inputType = this.functionInspector.getInputType(this.name);
}
@StreamListener
public void handle(@Input(Processor.INPUT) Flux<T> input) {
this.consumer.accept(input);
public void handle(@Input(Processor.INPUT) Flux<Message<?>> input) {
this.consumer.accept(input.map(convertInput()));
}
private Function<Message<?>, Object> convertInput() {
return m -> {
if (this.inputType.isAssignableFrom(m.getPayload().getClass())) {
return m.getPayload();
}
else {
return converter.fromMessage(m, this.inputType);
}
};
}
}

View File

@@ -18,31 +18,57 @@ package org.springframework.cloud.function.stream;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.cloud.function.context.FunctionInspector;
import org.springframework.cloud.function.invoker.AbstractFunctionInvoker;
import org.springframework.cloud.function.support.FluxFunction;
import org.springframework.cloud.function.support.FunctionUtils;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
/**
* @author Mark Fisher
* @author Marius Bogoevici
*/
public class StreamListeningFunctionInvoker
extends AbstractFunctionInvoker<Flux<?>, Flux<?>> {
public class StreamListeningFunctionInvoker extends AbstractFunctionInvoker<Flux<?>, Flux<?>>
implements SmartInitializingSingleton {
public StreamListeningFunctionInvoker(Function<?, ?> function) {
private final String name;
private final FunctionInspector functionInspector;
private final CompositeMessageConverterFactory converterFactory;
private MessageConverter converter;
private Class<?> inputType;
public StreamListeningFunctionInvoker(String name, Function<?, ?> function, FunctionInspector functionInspector,
CompositeMessageConverterFactory converterFactory) {
super(wrapIfNecessary(function));
this.name = name;
this.functionInspector = functionInspector;
this.converterFactory = converterFactory;
}
@Override
public void afterSingletonsInstantiated() {
this.converter = this.converterFactory.getMessageConverterForAllRegistered();
this.inputType = this.functionInspector.getInputType(this.name);
}
@StreamListener
@Output(Processor.OUTPUT)
public Flux<?> handle(@Input(Processor.INPUT) Flux<?> input) {
return this.doInvoke(input);
public Flux<?> handle(@Input(Processor.INPUT) Flux<Message<?>> input) {
return this.doInvoke(input.map(convertInput()));
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -53,4 +79,15 @@ public class StreamListeningFunctionInvoker
}
return function;
}
private Function<Message<?>, Object> convertInput() {
return m -> {
if (this.inputType.isAssignableFrom(m.getPayload().getClass())) {
return m.getPayload();
}
else {
return this.converter.fromMessage(m, this.inputType);
}
};
}
}

View File

@@ -0,0 +1,86 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.stream.function;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Marius Bogoevici
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = PojoStreamingFunctionTests.StreamingFunctionApplication.class, properties = {
"spring.cloud.stream.bindings.input.destination=data-in",
"spring.cloud.stream.bindings.output.destination=data-out", "spring.cloud.function.stream.endpoint=uppercase" })
public class PojoStreamingFunctionTests {
@Autowired
Processor processor;
@Autowired
MessageCollector messageCollector;
@Test
public void test() throws Exception {
processor.input().send(MessageBuilder.withPayload(new String("{\"name\":\"foo\"}")).build());
Message<?> result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS);
assertThat(result.getPayload()).isInstanceOf(Foo.class);
}
@SpringBootApplication
public static class StreamingFunctionApplication {
@Bean
public Function<Foo, Foo> uppercase() {
return f -> new Foo(f.getName().toUpperCase());
}
}
protected static class Foo {
private String name;
Foo() {
}
public Foo(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}