Strangle old inspector methods

Fixes gh-81
This commit is contained in:
Dave Syer
2017-07-12 14:29:44 +01:00
parent d7d49858f6
commit c8646d64d8
12 changed files with 123 additions and 186 deletions

View File

@@ -87,13 +87,14 @@ public class StreamListeningConsumerInvoker implements SmartInitializingSingleto
.containsKey(StreamConfigurationProperties.ROUTE_KEY)) {
String key = (String) input.getHeaders()
.get(StreamConfigurationProperties.ROUTE_KEY);
if (functionCatalog.lookupFunction(key) != null) {
if (functionCatalog.lookupConsumer(key) != null) {
return key;
}
}
else {
for (String candidate : names) {
Class<?> inputType = functionInspector.getInputType(candidate);
Class<?> inputType = functionInspector
.getInputType(functionCatalog.lookupConsumer(candidate));
Object value = this.converter.fromMessage(input, inputType);
if (value != null && inputType.isInstance(value)) {
name = candidate;
@@ -110,7 +111,8 @@ public class StreamListeningConsumerInvoker implements SmartInitializingSingleto
}
private Function<Message<?>, Object> convertInput(String name) {
Class<?> inputType = functionInspector.getInputType(name);
Class<?> inputType = functionInspector
.getInputType(functionCatalog.lookupConsumer(name));
return m -> {
if (Message.class.isAssignableFrom(inputType)) {
return m;

View File

@@ -18,12 +18,11 @@ package org.springframework.cloud.function.stream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.springframework.beans.factory.SmartInitializingSingleton;
@@ -85,17 +84,21 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
input.groupBy(this::select).flatMap(group -> group.key().process(group)));
}
// TODO: the routing key could be added here, but really it should be added in
// Spring Cloud Stream
// (https://github.com/spring-cloud/spring-cloud-stream/issues/1010)
private Flux<Message<?>> function(String name, Flux<Message<?>> flux) {
// TODO: the routing key could be added here, but really it should be added in
// Spring Cloud Stream
// (https://github.com/spring-cloud/spring-cloud-stream/issues/1010)
AtomicReference<Map<String, Object>> headers = new AtomicReference<Map<String, Object>>(
new LinkedHashMap<>());
return ((Flux<?>) functionCatalog.lookupFunction(name).apply(flux.map(message -> {
Object applied = convertInput(name).apply(message);
headers.set(message.getHeaders());
return applied;
}))).map(result -> message(result, headers.get()));
Function<Object, Flux<?>> function = functionCatalog.lookupFunction(name);
return flux.publish(values -> {
Flux<?> result = function
.apply(values.map(message -> convertInput(function).apply(message)));
Flux<Map<String, Object>> aggregate = headers(values);
return result.withLatestFrom(aggregate, (p, m) -> message(p, m));
});
}
private Flux<Map<String, Object>> headers(Flux<Message<?>> flux) {
return flux.map(message -> message.getHeaders());
}
private Message<?> message(Object result, Map<String, Object> headers) {
@@ -104,8 +107,8 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
}
private Flux<Message<?>> consumer(String name, Flux<Message<?>> flux) {
functionCatalog.lookupConsumer(name)
.accept(flux.map(message -> convertInput(name).apply(message)));
Consumer<Object> consumer = functionCatalog.lookupConsumer(name);
consumer.accept(flux.map(message -> convertInput(consumer).apply(message)));
return Flux.empty();
}
@@ -149,7 +152,8 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
}
else {
for (String candidate : names) {
Class<?> inputType = functionInspector.getInputType(candidate);
Class<?> inputType = functionInspector
.getInputType(functionCatalog.lookupFunction(candidate));
Object value = this.converter.fromMessage(input, inputType);
if (value != null && inputType.isInstance(value)) {
matches.add(candidate);
@@ -185,20 +189,20 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
return null;
}
private Function<Message<?>, Object> convertInput(String name) {
Class<?> inputType = functionInspector.getInputType(name);
private Function<Message<?>, Object> convertInput(Object function) {
Class<?> inputType = functionInspector.getInputType(function);
return m -> {
if (functionInspector.isMessage(name)) {
return MessageBuilder.withPayload(convertPayload(name, inputType, m))
if (functionInspector.isMessage(function)) {
return MessageBuilder.withPayload(convertPayload(inputType, m))
.copyHeaders(m.getHeaders()).build();
}
else {
return convertPayload(name, inputType, m);
return convertPayload(inputType, m);
}
};
}
private Object convertPayload(String name, Class<?> inputType, Message<?> m) {
private Object convertPayload(Class<?> inputType, Message<?> m) {
if (inputType.isAssignableFrom(m.getPayload().getClass())) {
return m.getPayload();
}