Extract common logic in invokers into base class
This commit is contained in:
@@ -49,11 +49,13 @@ As the table above shows the behaviour of the endpoint depends on the method and
|
||||
|
||||
Functions and consumers that are declared with input and output in `Message<?>` will see the request headers on the input messages, and the output message headers will be converted to HTTP headers.
|
||||
|
||||
When POSTing text the response format might be different with Spring Boot 2.0 and older versions, depending on the content negotiation (provide content type and accpt headers for the best results).
|
||||
|
||||
== Standalone Streaming Applications
|
||||
|
||||
To send or receive messages from a broker (such as RabbitMQ or Kafka) you can use the `spring-cloud-function-stream` adapter. Add the adapter to your classpath along with the appropriate binder from Spring Cloud Stream. The adapter will bind to the message broker as a `Processor` (input and output streams) unless the user explicitly disables one or the other using `spring.cloud.function.stream.{source,sink}.enabled=false`.
|
||||
|
||||
An incoming message is routed to a function (or consumer). If there is only one, then the choice is obvious. If there are multiple functions that can accept an incoming message, the message is inspected to see if there is a `stream_routekey` header containing the name of a function. The header is also added to outgoing messages from a supplier. Messages with no route key can be routed exclusively to a function or consumer by specifying `spring.cloud.function.stream.{processor,sink}.name`. A single supplier can be chosen for output messages (if more than one is available) using the `spring.cloud.function.stream.supplier.name`. Routing headers or function names can be composed using a comma or pipe separated name.
|
||||
An incoming message is routed to a function (or consumer). If there is only one, then the choice is obvious. If there are multiple functions that can accept an incoming message, the message is inspected to see if there is a `stream_routekey` header containing the name of a function. Routing headers or function names can be composed using a comma- or pipe-separated name. The header is also added to outgoing messages from a supplier. Messages with no route key can be routed exclusively to a function or consumer by specifying `spring.cloud.function.stream.{processor,sink}.name`. If a single function cannot be identified to process an incoming message there will be an error, unless you set `spring.cloud.function.stream.shared=true`, in which case such messages will be sent to all compatible functions. A single supplier can be chosen for output messages from a supplier (if more than one is available) using the `spring.cloud.function.stream.source.name`.
|
||||
|
||||
NOTE: some binders will fail on startup if the message broker is not available and the function catalog contains suppliers that immediately produce messages when accessed. You can switch off the automatic publishing from suppliers on startup using the `spring.cloud.function.strean.supplier.enabled=false` flag.
|
||||
|
||||
|
||||
@@ -0,0 +1,236 @@
|
||||
/*
|
||||
* Copyright 2016-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.function.stream.config;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.springframework.beans.factory.SmartInitializingSingleton;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.context.catalog.FunctionInspector;
|
||||
import org.springframework.cloud.function.context.message.MessageUtils;
|
||||
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.converter.MessageConverter;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
* @author Dave Syer
|
||||
*
|
||||
*/
|
||||
public abstract class AbstractStreamListeningInvoker
|
||||
implements SmartInitializingSingleton {
|
||||
|
||||
private final FunctionInspector functionInspector;
|
||||
|
||||
private final FunctionCatalog functionCatalog;
|
||||
|
||||
private final CompositeMessageConverterFactory converterFactory;
|
||||
|
||||
private MessageConverter converter;
|
||||
|
||||
private static final Object UNCONVERTED = new Object();
|
||||
|
||||
private final String defaultRoute;
|
||||
|
||||
private final Map<String, FluxMessageProcessor> processors = new HashMap<>();
|
||||
|
||||
private static final FluxMessageProcessor NOENDPOINT = flux -> Flux.empty();
|
||||
|
||||
private boolean share;
|
||||
|
||||
public AbstractStreamListeningInvoker(FunctionCatalog functionCatalog,
|
||||
FunctionInspector functionInspector,
|
||||
CompositeMessageConverterFactory converterFactory, String defaultRoute,
|
||||
boolean share) {
|
||||
this.functionCatalog = functionCatalog;
|
||||
this.functionInspector = functionInspector;
|
||||
this.converterFactory = converterFactory;
|
||||
this.defaultRoute = defaultRoute;
|
||||
this.share = share;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterSingletonsInstantiated() {
|
||||
this.converter = this.converterFactory.getMessageConverterForAllRegistered();
|
||||
}
|
||||
|
||||
protected Mono<Void> consumer(String name, Flux<Message<?>> flux) {
|
||||
Consumer<Object> consumer = functionCatalog.lookup(Consumer.class, name);
|
||||
flux = flux.publish().refCount(2);
|
||||
// The consumer will subscribe to the input flux, so we need to listen separately
|
||||
consumer.accept(flux.map(message -> convertInput(consumer).apply(message))
|
||||
.filter(transformed -> transformed != UNCONVERTED));
|
||||
return flux.then(Mono.empty());
|
||||
}
|
||||
|
||||
protected Flux<Message<?>> function(String name, Flux<Message<?>> flux) {
|
||||
Function<Object, Flux<?>> function = functionCatalog.lookup(Function.class, name);
|
||||
return flux.publish(values -> {
|
||||
Flux<?> result = function
|
||||
.apply(values.map(message -> convertInput(function).apply(message)));
|
||||
if (this.functionInspector.isMessage(function)) {
|
||||
result = result.map(message -> MessageUtils.unpack(function, message));
|
||||
}
|
||||
Flux<Map<String, Object>> aggregate = headers(values);
|
||||
return aggregate.withLatestFrom(result,
|
||||
(map, payload) -> message(map, payload));
|
||||
});
|
||||
}
|
||||
|
||||
private Flux<Map<String, Object>> headers(Flux<Message<?>> flux) {
|
||||
return flux.map(message -> message.getHeaders());
|
||||
}
|
||||
|
||||
private Message<?> message(Map<String, Object> headers, Object result) {
|
||||
return result instanceof Message
|
||||
? MessageBuilder.fromMessage((Message<?>) result)
|
||||
.copyHeadersIfAbsent(headers).build()
|
||||
: MessageBuilder.withPayload(result).copyHeadersIfAbsent(headers).build();
|
||||
}
|
||||
|
||||
private Function<Message<?>, Object> convertInput(Object function) {
|
||||
Class<?> inputType = functionInspector.getInputType(function);
|
||||
return m -> {
|
||||
if (functionInspector.isMessage(function)) {
|
||||
return MessageUtils.create(function, convertPayload(inputType, m),
|
||||
m.getHeaders());
|
||||
}
|
||||
else {
|
||||
return convertPayload(inputType, m);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
protected Object convertPayload(Class<?> inputType, Message<?> m) {
|
||||
Object result;
|
||||
if (inputType.isAssignableFrom(m.getPayload().getClass())) {
|
||||
result = m.getPayload();
|
||||
}
|
||||
else {
|
||||
result = this.converter.fromMessage(m, inputType);
|
||||
}
|
||||
if (result == null) {
|
||||
result = UNCONVERTED;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private Flux<Message<?>> balance(List<String> names, Flux<Message<?>> flux) {
|
||||
if (names.isEmpty()) {
|
||||
return Flux.empty();
|
||||
}
|
||||
flux = flux.hide();
|
||||
Flux<Message<?>> result = Flux.empty();
|
||||
if (names.size() > 1) {
|
||||
if (this.share) {
|
||||
flux = flux.publish().refCount(names.size());
|
||||
}
|
||||
else {
|
||||
return Flux.error(new IllegalStateException(
|
||||
"Multiple matches and share disabled: " + names));
|
||||
}
|
||||
}
|
||||
for (String name : names) {
|
||||
if (functionCatalog.lookup(Consumer.class, name) != null) {
|
||||
result = result.mergeWith(
|
||||
consumer(name, flux).thenMany(Flux.<Message<?>>empty()));
|
||||
}
|
||||
else {
|
||||
result = result.mergeWith(function(name, flux));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
protected FluxMessageProcessor select(Message<?> input) {
|
||||
FluxMessageProcessor processor = null;
|
||||
if (input.getHeaders().containsKey(StreamConfigurationProperties.ROUTE_KEY)) {
|
||||
String key = (String) input.getHeaders()
|
||||
.get(StreamConfigurationProperties.ROUTE_KEY);
|
||||
processor = stash(key);
|
||||
}
|
||||
if (processor == null && defaultRoute != null) {
|
||||
processor = stash(defaultRoute);
|
||||
}
|
||||
if (processor == null) {
|
||||
Set<String> names = new LinkedHashSet<>(
|
||||
functionCatalog.getNames(Function.class));
|
||||
names.addAll(functionCatalog.getNames(Consumer.class));
|
||||
List<String> matches = new ArrayList<>();
|
||||
if (names.size() == 1) {
|
||||
String key = names.iterator().next();
|
||||
processor = stash(key);
|
||||
}
|
||||
else {
|
||||
for (String candidate : names) {
|
||||
Object function = functionCatalog.lookup(Function.class, candidate);
|
||||
if (function == null) {
|
||||
function = functionCatalog.lookup(Consumer.class, candidate);
|
||||
}
|
||||
if (function == null) {
|
||||
continue;
|
||||
}
|
||||
Class<?> inputType = functionInspector.getInputType(function);
|
||||
Object value = convertPayload(inputType, input);
|
||||
if (value != null && inputType.isInstance(value)) {
|
||||
matches.add(candidate);
|
||||
}
|
||||
}
|
||||
if (matches.size() == 1) {
|
||||
processor = stash(matches.iterator().next());
|
||||
}
|
||||
else {
|
||||
return flux -> balance(matches, flux);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (processor == null) {
|
||||
return NOENDPOINT;
|
||||
}
|
||||
return processor;
|
||||
}
|
||||
|
||||
private FluxMessageProcessor stash(String key) {
|
||||
if (functionCatalog.lookup(Function.class, key) != null) {
|
||||
if (!processors.containsKey(key)) {
|
||||
processors.put(key, flux -> function(key, flux));
|
||||
}
|
||||
return processors.get(key);
|
||||
}
|
||||
else if (functionCatalog.lookup(Consumer.class, key) != null) {
|
||||
if (!processors.containsKey(key)) {
|
||||
processors.put(key,
|
||||
flux -> consumer(key, flux).thenMany(Flux.<Message<?>>empty()));
|
||||
}
|
||||
return processors.get(key);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
interface FluxMessageProcessor {
|
||||
Flux<Message<?>> process(Flux<Message<?>> flux);
|
||||
}
|
||||
}
|
||||
@@ -16,66 +16,26 @@
|
||||
|
||||
package org.springframework.cloud.function.stream.config;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.springframework.beans.factory.SmartInitializingSingleton;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.context.catalog.FunctionInspector;
|
||||
import org.springframework.cloud.function.context.message.MessageUtils;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
|
||||
import org.springframework.cloud.stream.messaging.Processor;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.converter.MessageConverter;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
* @author Dave Syer
|
||||
*/
|
||||
public class StreamListeningConsumerInvoker implements SmartInitializingSingleton {
|
||||
|
||||
private final FunctionInspector functionInspector;
|
||||
|
||||
private final FunctionCatalog functionCatalog;
|
||||
|
||||
private final CompositeMessageConverterFactory converterFactory;
|
||||
|
||||
private MessageConverter converter;
|
||||
|
||||
private final String defaultRoute;
|
||||
|
||||
private final Map<String, FluxMessageProcessor> processors = new HashMap<>();
|
||||
|
||||
private static final FluxMessageProcessor NOENDPOINT = flux -> Mono.empty();
|
||||
|
||||
private static final Object UNCONVERTED = new Object();
|
||||
|
||||
private boolean share;
|
||||
public class StreamListeningConsumerInvoker extends AbstractStreamListeningInvoker {
|
||||
|
||||
public StreamListeningConsumerInvoker(FunctionCatalog functionCatalog,
|
||||
FunctionInspector functionInspector,
|
||||
CompositeMessageConverterFactory converterFactory, String defaultRoute,
|
||||
boolean share) {
|
||||
this.functionCatalog = functionCatalog;
|
||||
this.functionInspector = functionInspector;
|
||||
this.converterFactory = converterFactory;
|
||||
this.defaultRoute = defaultRoute;
|
||||
this.share = share;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterSingletonsInstantiated() {
|
||||
this.converter = this.converterFactory.getMessageConverterForAllRegistered();
|
||||
super(functionCatalog, functionInspector, converterFactory, defaultRoute, share);
|
||||
}
|
||||
|
||||
@StreamListener
|
||||
@@ -84,117 +44,4 @@ public class StreamListeningConsumerInvoker implements SmartInitializingSingleto
|
||||
.subscribe();
|
||||
}
|
||||
|
||||
private Mono<Void> consumer(String name, Flux<Message<?>> flux) {
|
||||
Consumer<Object> consumer = functionCatalog.lookup(Consumer.class, name);
|
||||
consumer.accept(flux.map(message -> convertInput(consumer).apply(message))
|
||||
.filter(transformed -> transformed != UNCONVERTED));
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
private Mono<Void> balance(List<String> names, Flux<Message<?>> flux) {
|
||||
if (names.isEmpty()) {
|
||||
return Mono.empty();
|
||||
}
|
||||
Flux<?> result = Flux.empty();
|
||||
if (names.size() > 1) {
|
||||
if (this.share) {
|
||||
flux = flux.share();
|
||||
}
|
||||
else {
|
||||
return Mono.error(new IllegalStateException(
|
||||
"Multiple matches and share disabled: " + names));
|
||||
}
|
||||
}
|
||||
for (String name : names) {
|
||||
result = result.zipWith(consumer(name, flux));
|
||||
}
|
||||
return result.then();
|
||||
}
|
||||
|
||||
private FluxMessageProcessor select(Message<?> input) {
|
||||
String name = null;
|
||||
if (input.getHeaders().containsKey(StreamConfigurationProperties.ROUTE_KEY)) {
|
||||
String key = (String) input.getHeaders()
|
||||
.get(StreamConfigurationProperties.ROUTE_KEY);
|
||||
name = stash(key);
|
||||
}
|
||||
if (name == null && defaultRoute != null) {
|
||||
name = stash(defaultRoute);
|
||||
}
|
||||
if (name == null) {
|
||||
Set<String> names = new LinkedHashSet<>(
|
||||
functionCatalog.getNames(Consumer.class));
|
||||
List<String> matches = new ArrayList<>();
|
||||
if (names.size() == 1) {
|
||||
String key = names.iterator().next();
|
||||
name = stash(key);
|
||||
}
|
||||
else {
|
||||
for (String candidate : names) {
|
||||
Object function = functionCatalog.lookup(Consumer.class, candidate);
|
||||
if (function == null) {
|
||||
continue;
|
||||
}
|
||||
Class<?> inputType = functionInspector.getInputType(function);
|
||||
Object value = this.converter.fromMessage(input, inputType);
|
||||
if (value != null && inputType.isInstance(value)) {
|
||||
matches.add(candidate);
|
||||
}
|
||||
}
|
||||
if (matches.size() == 1) {
|
||||
name = stash(matches.iterator().next());
|
||||
}
|
||||
else {
|
||||
// TODO: do we really want this? Or maybe warn that it is happening?
|
||||
return flux -> balance(matches, flux);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (name == null) {
|
||||
return NOENDPOINT;
|
||||
}
|
||||
return processors.get(name);
|
||||
}
|
||||
|
||||
private String stash(String key) {
|
||||
if (functionCatalog.lookup(Consumer.class, key) != null) {
|
||||
if (!processors.containsKey(key)) {
|
||||
processors.put(key, flux -> consumer(key, flux));
|
||||
}
|
||||
return key;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private Function<Message<?>, Object> convertInput(Object function) {
|
||||
Class<?> inputType = functionInspector.getInputType(function);
|
||||
return m -> {
|
||||
if (functionInspector.isMessage(function)) {
|
||||
return MessageUtils.create(function, convertPayload(inputType, m),
|
||||
m.getHeaders());
|
||||
}
|
||||
else {
|
||||
return convertPayload(inputType, m);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private Object convertPayload(Class<?> inputType, Message<?> m) {
|
||||
Object result;
|
||||
if (inputType.isAssignableFrom(m.getPayload().getClass())) {
|
||||
result = m.getPayload();
|
||||
}
|
||||
else {
|
||||
result = this.converter.fromMessage(m, inputType);
|
||||
}
|
||||
if (result == null) {
|
||||
result = UNCONVERTED;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
interface FluxMessageProcessor {
|
||||
Mono<Void> process(Flux<Message<?>> flux);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -16,19 +16,8 @@
|
||||
|
||||
package org.springframework.cloud.function.stream.config;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.springframework.beans.factory.SmartInitializingSingleton;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.context.catalog.FunctionInspector;
|
||||
import org.springframework.cloud.function.context.message.MessageUtils;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.Output;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
@@ -36,8 +25,6 @@ import org.springframework.cloud.stream.converter.CompositeMessageConverterFacto
|
||||
import org.springframework.cloud.stream.messaging.Processor;
|
||||
import org.springframework.cloud.stream.reactive.FluxSender;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.converter.MessageConverter;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
@@ -46,40 +33,13 @@ import reactor.core.publisher.Mono;
|
||||
* @author Mark Fisher
|
||||
* @author Marius Bogoevici
|
||||
*/
|
||||
public class StreamListeningFunctionInvoker implements SmartInitializingSingleton {
|
||||
|
||||
private final FunctionInspector functionInspector;
|
||||
|
||||
private final FunctionCatalog functionCatalog;
|
||||
|
||||
private final CompositeMessageConverterFactory converterFactory;
|
||||
|
||||
private MessageConverter converter;
|
||||
|
||||
private final String defaultRoute;
|
||||
|
||||
private final Map<String, FluxMessageProcessor> processors = new HashMap<>();
|
||||
|
||||
private static final FluxMessageProcessor NOENDPOINT = flux -> Flux.empty();
|
||||
|
||||
private static final Object UNCONVERTED = new Object();
|
||||
|
||||
private boolean share;
|
||||
public class StreamListeningFunctionInvoker extends AbstractStreamListeningInvoker {
|
||||
|
||||
public StreamListeningFunctionInvoker(FunctionCatalog functionCatalog,
|
||||
FunctionInspector functionInspector,
|
||||
CompositeMessageConverterFactory converterFactory, String defaultRoute,
|
||||
boolean share) {
|
||||
this.functionCatalog = functionCatalog;
|
||||
this.functionInspector = functionInspector;
|
||||
this.converterFactory = converterFactory;
|
||||
this.defaultRoute = defaultRoute;
|
||||
this.share = share;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterSingletonsInstantiated() {
|
||||
this.converter = this.converterFactory.getMessageConverterForAllRegistered();
|
||||
super(functionCatalog, functionInspector, converterFactory, defaultRoute, share);
|
||||
}
|
||||
|
||||
@StreamListener
|
||||
@@ -89,159 +49,4 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
|
||||
input.groupBy(this::select).flatMap(group -> group.key().process(group)));
|
||||
}
|
||||
|
||||
private Flux<Message<?>> function(String name, Flux<Message<?>> flux) {
|
||||
Function<Object, Flux<?>> function = functionCatalog.lookup(Function.class, name);
|
||||
return flux.publish(values -> {
|
||||
Flux<?> result = function
|
||||
.apply(values.map(message -> convertInput(function).apply(message)));
|
||||
if (this.functionInspector.isMessage(function)) {
|
||||
result = result.map(message -> MessageUtils.unpack(function, message));
|
||||
}
|
||||
Flux<Map<String, Object>> aggregate = headers(values);
|
||||
return aggregate.withLatestFrom(result,
|
||||
(map, payload) -> message(map, payload));
|
||||
});
|
||||
}
|
||||
|
||||
private Flux<Map<String, Object>> headers(Flux<Message<?>> flux) {
|
||||
return flux.map(message -> message.getHeaders());
|
||||
}
|
||||
|
||||
private Message<?> message(Map<String, Object> headers, Object result) {
|
||||
return result instanceof Message
|
||||
? MessageBuilder.fromMessage((Message<?>) result)
|
||||
.copyHeadersIfAbsent(headers).build()
|
||||
: MessageBuilder.withPayload(result).copyHeadersIfAbsent(headers).build();
|
||||
}
|
||||
|
||||
private Flux<Message<?>> consumer(String name, Flux<Message<?>> flux) {
|
||||
Consumer<Object> consumer = functionCatalog.lookup(Consumer.class, name);
|
||||
flux = flux.publish().refCount(2);
|
||||
// The consumer will subscribe to the input flux, so we need to listen separately
|
||||
consumer.accept(flux.map(message -> convertInput(consumer).apply(message))
|
||||
.filter(transformed -> transformed != UNCONVERTED));
|
||||
return flux.ignoreElements().flux();
|
||||
}
|
||||
|
||||
private Flux<Message<?>> balance(List<String> names, Flux<Message<?>> flux) {
|
||||
if (names.isEmpty()) {
|
||||
return Flux.empty();
|
||||
}
|
||||
flux = flux.hide();
|
||||
Flux<Message<?>> result = Flux.empty();
|
||||
if (names.size() > 1) {
|
||||
if (this.share) {
|
||||
flux = flux.publish().refCount(names.size());
|
||||
}
|
||||
else {
|
||||
return Flux.error(new IllegalStateException(
|
||||
"Multiple matches and share disabled: " + names));
|
||||
}
|
||||
}
|
||||
for (String name : names) {
|
||||
if (functionCatalog.lookup(Consumer.class, name) != null) {
|
||||
result = result.mergeWith(consumer(name, flux));
|
||||
}
|
||||
else {
|
||||
result = result.mergeWith(function(name, flux));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private FluxMessageProcessor select(Message<?> input) {
|
||||
String name = null;
|
||||
if (input.getHeaders().containsKey(StreamConfigurationProperties.ROUTE_KEY)) {
|
||||
String key = (String) input.getHeaders()
|
||||
.get(StreamConfigurationProperties.ROUTE_KEY);
|
||||
name = stash(key);
|
||||
}
|
||||
if (name == null && defaultRoute != null) {
|
||||
name = stash(defaultRoute);
|
||||
}
|
||||
if (name == null) {
|
||||
Set<String> names = new LinkedHashSet<>(
|
||||
functionCatalog.getNames(Function.class));
|
||||
names.addAll(functionCatalog.getNames(Consumer.class));
|
||||
List<String> matches = new ArrayList<>();
|
||||
if (names.size() == 1) {
|
||||
String key = names.iterator().next();
|
||||
name = stash(key);
|
||||
}
|
||||
else {
|
||||
for (String candidate : names) {
|
||||
Object function = functionCatalog.lookup(Function.class, candidate);
|
||||
if (function == null) {
|
||||
function = functionCatalog.lookup(Consumer.class, candidate);
|
||||
}
|
||||
if (function == null) {
|
||||
continue;
|
||||
}
|
||||
Class<?> inputType = functionInspector.getInputType(function);
|
||||
Object value = this.converter.fromMessage(input, inputType);
|
||||
if (value != null && inputType.isInstance(value)) {
|
||||
matches.add(candidate);
|
||||
}
|
||||
}
|
||||
if (matches.size() == 1) {
|
||||
name = stash(matches.iterator().next());
|
||||
}
|
||||
else {
|
||||
return flux -> balance(matches, flux);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (name == null) {
|
||||
return NOENDPOINT;
|
||||
}
|
||||
return processors.get(name);
|
||||
}
|
||||
|
||||
private String stash(String key) {
|
||||
if (functionCatalog.lookup(Function.class, key) != null) {
|
||||
if (!processors.containsKey(key)) {
|
||||
processors.put(key, flux -> function(key, flux));
|
||||
}
|
||||
return key;
|
||||
}
|
||||
else if (functionCatalog.lookup(Consumer.class, key) != null) {
|
||||
if (!processors.containsKey(key)) {
|
||||
processors.put(key, flux -> consumer(key, flux));
|
||||
}
|
||||
return key;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private Function<Message<?>, Object> convertInput(Object function) {
|
||||
Class<?> inputType = functionInspector.getInputType(function);
|
||||
return m -> {
|
||||
if (functionInspector.isMessage(function)) {
|
||||
return MessageUtils.create(function, convertPayload(inputType, m),
|
||||
m.getHeaders());
|
||||
}
|
||||
else {
|
||||
return convertPayload(inputType, m);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private Object convertPayload(Class<?> inputType, Message<?> m) {
|
||||
Object result;
|
||||
if (inputType.isAssignableFrom(m.getPayload().getClass())) {
|
||||
result = m.getPayload();
|
||||
}
|
||||
else {
|
||||
result = this.converter.fromMessage(m, inputType);
|
||||
}
|
||||
if (result == null) {
|
||||
result = UNCONVERTED;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
interface FluxMessageProcessor {
|
||||
Flux<Message<?>> process(Flux<Message<?>> flux);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,144 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.function.stream.mixed;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.cloud.function.stream.config.StreamConfigurationProperties;
|
||||
import org.springframework.cloud.stream.messaging.Processor;
|
||||
import org.springframework.cloud.stream.test.binder.MessageCollector;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
*/
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(classes = PojoStreamingNotSharedTests.StreamingFunctionApplication.class)
|
||||
public class PojoStreamingNotSharedTests {
|
||||
|
||||
@Autowired
|
||||
Processor processor;
|
||||
|
||||
@Autowired
|
||||
MessageCollector messageCollector;
|
||||
|
||||
@Autowired
|
||||
List<Bar> collector;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
collector.clear();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void balance() throws Exception {
|
||||
processor.input()
|
||||
.send(MessageBuilder.withPayload("{\"name\":\"hello\"}").build());
|
||||
processor.input()
|
||||
.send(MessageBuilder.withPayload("{\"name\":\"world\"}").build());
|
||||
assertThat(messageCollector.forChannel(processor.output())).isEmpty();
|
||||
assertThat(collector).hasSize(0);
|
||||
// There should be an error in the logs (sharing disabled by default)
|
||||
}
|
||||
|
||||
@Test
|
||||
public void routing() throws Exception {
|
||||
processor.input().send(MessageBuilder.withPayload("{\"name\":\"hello\"}")
|
||||
.setHeader(StreamConfigurationProperties.ROUTE_KEY, "uppercase").build());
|
||||
processor.input().send(MessageBuilder.withPayload("{\"name\":\"world\"}")
|
||||
.setHeader(StreamConfigurationProperties.ROUTE_KEY, "uppercase").build());
|
||||
Message<?> result = messageCollector.forChannel(processor.output()).poll(1000,
|
||||
TimeUnit.MILLISECONDS);
|
||||
assertThat(result.getPayload()).isInstanceOf(Foo.class);
|
||||
// routing key sends messages to the function, not the consumer
|
||||
assertThat(collector).hasSize(0);
|
||||
}
|
||||
|
||||
@SpringBootApplication
|
||||
public static class StreamingFunctionApplication {
|
||||
|
||||
@Bean
|
||||
public Function<Foo, Foo> uppercase() {
|
||||
return f -> new Foo(f.getName().toUpperCase());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public List<Bar> collector() {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Consumer<Bar> sink(final List<Bar> list) {
|
||||
return s -> list.add(s);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected static class Foo {
|
||||
private String name;
|
||||
|
||||
Foo() {
|
||||
}
|
||||
|
||||
public Foo(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
}
|
||||
|
||||
protected static class Bar {
|
||||
private String name;
|
||||
|
||||
Bar() {
|
||||
}
|
||||
|
||||
public Bar(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user