GH-654 Initial support for non-SpEL based routing mechanism

This commit is contained in:
Oleg Zhurakousky
2021-02-22 18:57:07 +01:00
parent c01b25f2fa
commit 29115aeb64
6 changed files with 171 additions and 37 deletions

View File

@@ -0,0 +1,39 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context;
import org.reactivestreams.Publisher;
import org.springframework.messaging.Message;
/**
*
* @author Oleg Zhurakousky
* @since 3.1
*/
public interface MessageRoutingCallback {
default String route(Message<?> message, FunctionProperties functionProperties) {
// noop
return null;
}
default String route(Publisher<?> publisher, FunctionProperties functionProperties) {
//noop
return null;
}
}

View File

@@ -34,6 +34,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.MessageRoutingCallback;
import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry;
import org.springframework.cloud.function.core.FunctionInvocationHelper;
import org.springframework.cloud.function.json.GsonMapper;
@@ -116,8 +117,8 @@ public class ContextFunctionCatalogAutoConfiguration {
@Bean(RoutingFunction.FUNCTION_NAME)
RoutingFunction functionRouter(FunctionCatalog functionCatalog, FunctionProperties functionProperties,
BeanFactory beanFactory) {
return new RoutingFunction(functionCatalog, functionProperties, new BeanFactoryResolver(beanFactory));
BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) {
return new RoutingFunction(functionCatalog, functionProperties, new BeanFactoryResolver(beanFactory), routingCallback);
}
private boolean isConverterEligible(Object messageConverter) {

View File

@@ -26,6 +26,7 @@ import reactor.core.publisher.Mono;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.MessageRoutingCallback;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.context.expression.MapAccessor;
import org.springframework.expression.BeanResolver;
@@ -63,14 +64,17 @@ public class RoutingFunction implements Function<Object, Object> {
private final FunctionProperties functionProperties;
private final MessageRoutingCallback routingCallback;
public RoutingFunction(FunctionCatalog functionCatalog, FunctionProperties functionProperties) {
this(functionCatalog, functionProperties, null);
this(functionCatalog, functionProperties, null, null);
}
public RoutingFunction(FunctionCatalog functionCatalog, FunctionProperties functionProperties,
BeanResolver beanResolver) {
BeanResolver beanResolver, MessageRoutingCallback routingCallback) {
this.functionCatalog = functionCatalog;
this.functionProperties = functionProperties;
this.routingCallback = routingCallback;
this.evalContext.addPropertyAccessor(new MapAccessor());
evalContext.setBeanResolver(beanResolver);
}
@@ -80,6 +84,7 @@ public class RoutingFunction implements Function<Object, Object> {
return this.route(input, input instanceof Publisher);
}
/*
* - Check if spring.cloud.function.definition is set in header and if it is use it.
* If NOT
@@ -90,45 +95,56 @@ public class RoutingFunction implements Function<Object, Object> {
* - Fail
*/
private Object route(Object input, boolean originalInputIsPublisher) {
FunctionInvocationWrapper function;
FunctionInvocationWrapper function = null;
if (input instanceof Message) {
Message<?> message = (Message<?>) input;
if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.definition"))) {
function = functionFromDefinition((String) message.getHeaders().get("spring.cloud.function.definition"));
if (function.isInputTypePublisher()) {
this.assertOriginalInputIsNotPublisher(originalInputIsPublisher);
if (this.routingCallback != null) {
function = this.functionFromCallback(message);
}
if (function == null) {
if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.definition"))) {
function = functionFromDefinition((String) message.getHeaders().get("spring.cloud.function.definition"));
if (function.isInputTypePublisher()) {
this.assertOriginalInputIsNotPublisher(originalInputIsPublisher);
}
}
}
else if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.routing-expression"))) {
function = this.functionFromExpression((String) message.getHeaders().get("spring.cloud.function.routing-expression"), message);
if (function.isInputTypePublisher()) {
this.assertOriginalInputIsNotPublisher(originalInputIsPublisher);
else if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.routing-expression"))) {
function = this.functionFromExpression((String) message.getHeaders().get("spring.cloud.function.routing-expression"), message);
if (function.isInputTypePublisher()) {
this.assertOriginalInputIsNotPublisher(originalInputIsPublisher);
}
}
else if (StringUtils.hasText(functionProperties.getRoutingExpression())) {
function = this.functionFromExpression(functionProperties.getRoutingExpression(), message);
}
else if (StringUtils.hasText(functionProperties.getDefinition())) {
function = this.functionFromDefinition(functionProperties.getDefinition());
}
else {
throw new IllegalStateException("Failed to establish route, since neither were provided: "
+ "'spring.cloud.function.definition' as Message header or as application property or "
+ "'spring.cloud.function.routing-expression' as application property.");
}
}
else if (StringUtils.hasText(functionProperties.getRoutingExpression())) {
function = this.functionFromExpression(functionProperties.getRoutingExpression(), message);
}
else if (StringUtils.hasText(functionProperties.getDefinition())) {
function = functionFromDefinition(functionProperties.getDefinition());
}
else {
throw new IllegalStateException("Failed to establish route, since neither were provided: "
+ "'spring.cloud.function.definition' as Message header or as application property or "
+ "'spring.cloud.function.routing-expression' as application property.");
}
}
else if (input instanceof Publisher) {
if (StringUtils.hasText(functionProperties.getRoutingExpression())) {
function = this.functionFromExpression(functionProperties.getRoutingExpression(), input);
if (this.routingCallback != null) {
function = this.functionFromCallback(input);
}
else
if (StringUtils.hasText(functionProperties.getDefinition())) {
function = functionFromDefinition(functionProperties.getDefinition());
}
else {
return input instanceof Mono
? Mono.from((Publisher<?>) input).map(v -> route(v, originalInputIsPublisher))
: Flux.from((Publisher<?>) input).map(v -> route(v, originalInputIsPublisher));
if (function == null) {
if (StringUtils.hasText(functionProperties.getRoutingExpression())) {
function = this.functionFromExpression(functionProperties.getRoutingExpression(), input);
}
else
if (StringUtils.hasText(functionProperties.getDefinition())) {
function = functionFromDefinition(functionProperties.getDefinition());
}
else {
return input instanceof Mono
? Mono.from((Publisher<?>) input).map(v -> route(v, originalInputIsPublisher))
: Flux.from((Publisher<?>) input).map(v -> route(v, originalInputIsPublisher));
}
}
}
else {
@@ -156,6 +172,22 @@ public class RoutingFunction implements Function<Object, Object> {
+ "spring.cloud.function.routing-expression' as application properties.");
}
private FunctionInvocationWrapper functionFromCallback(Object input) {
if (input instanceof Message) {
String functionDefinition = this.routingCallback.route((Message<?>) input, this.functionProperties);
if (StringUtils.hasText(functionDefinition)) {
return this.functionFromDefinition(functionDefinition);
}
}
else {
String functionDefinition = this.routingCallback.route((Publisher<?>) input, this.functionProperties);
if (StringUtils.hasText(functionDefinition)) {
return this.functionFromDefinition(functionDefinition);
}
}
return null;
}
private FunctionInvocationWrapper functionFromDefinition(String definition) {
FunctionInvocationWrapper function = functionCatalog.lookup(definition);
Assert.notNull(function, "Failed to lookup function to route based on the value of 'spring.cloud.function.definition' property '"