GH-653 Make consistent priority order of function definition in RSocket

With this commit spring.cloud.function.routing-expression property takes precedence over all, then route(<function.definition>) and then spring.cloud.function.definition property
This commit is contained in:
Oleg Zhurakousky
2021-02-22 11:11:12 +01:00
parent 1f2c3d8017
commit 57776c6b0c
3 changed files with 51 additions and 39 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2020-2020 the original author or authors.
* Copyright 2020-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -59,6 +59,7 @@ import org.springframework.messaging.rsocket.annotation.support.RSocketPayloadRe
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.RouteMatcher.Route;
import org.springframework.util.StringUtils;
import org.springframework.web.util.pattern.PathPatternRouteMatcher;
@@ -116,10 +117,7 @@ class FunctionRSocketMessageHandler extends RSocketMessageHandler {
@Override
public Mono<Void> handleMessage(Message<?> message) throws MessagingException {
if (!FrameType.SETUP.equals(message.getHeaders().get("rsocketFrameType"))) {
String destination = this.getDestination(message).value();
if (!StringUtils.hasText(destination)) {
destination = this.discoverAndInjectDestinationHeader(message);
}
String destination = this.discoverAndInjectDestinationHeader(message);
Set<String> mappings = this.getDestinationLookup().keySet();
if (!mappings.contains(destination)) {
@@ -154,16 +152,33 @@ class FunctionRSocketMessageHandler extends RSocketMessageHandler {
@SuppressWarnings("unchecked")
private String discoverAndInjectDestinationHeader(Message<?> message) {
String destination = this.functionProperties.getDefinition();
if (!StringUtils.hasText(destination) && StringUtils.hasText(this.functionProperties.getRoutingExpression())) {
String destination;
if (StringUtils.hasText(this.functionProperties.getRoutingExpression())) {
destination = RoutingFunction.FUNCTION_NAME;
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
.getField(this.headersField, message.getHeaders());
PathPatternRouteMatcher matcher = new PathPatternRouteMatcher();
headersMap.put(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, matcher.parseRoute(destination));
}
else {
Route route = (Route) message.getHeaders().get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER);
destination = route.value();
if (!StringUtils.hasText(destination)) {
destination = this.functionProperties.getDefinition();
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
.getField(this.headersField, message.getHeaders());
PathPatternRouteMatcher matcher = new PathPatternRouteMatcher();
headersMap.put(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, matcher.parseRoute(destination));
}
}
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
.getField(this.headersField, message.getHeaders());
PathPatternRouteMatcher matcher = new PathPatternRouteMatcher();
headersMap.put(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, matcher.parseRoute(destination));
if (!StringUtils.hasText(destination) && logger.isDebugEnabled()) {
logger.debug("Failed to discover function definition. Neither "
+ "`spring.cloud.function.definition`, nor `.route(<function.definition>)`, nor "
+ "`spring.cloud.function.routing-expression` were provided. Wil use empty string "
+ "for lookup, which will work only if there is one function in Function Catalog");
}
return destination;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2020-2020 the original author or authors.
* Copyright 2020-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,7 +16,6 @@
package org.springframework.cloud.function.rsocket;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -24,14 +23,10 @@ import org.springframework.boot.autoconfigure.rsocket.RSocketMessageHandlerCusto
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.StringUtils;
/**
* Main configuration class for components required to support RSocket integration with
@@ -45,14 +40,8 @@ import org.springframework.util.StringUtils;
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties({ FunctionProperties.class, RSocketFunctionProperties.class })
@ConditionalOnProperty(name = FunctionProperties.PREFIX + ".rsocket.enabled", matchIfMissing = true)
class RSocketAutoConfiguration implements ApplicationContextAware {
class RSocketAutoConfiguration {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Bean
@ConditionalOnMissingBean
@@ -64,18 +53,6 @@ class RSocketAutoConfiguration implements ApplicationContextAware {
FunctionRSocketMessageHandler rsocketMessageHandler = new FunctionRSocketMessageHandler(functionCatalog, functionProperties);
rsocketMessageHandler.setRSocketStrategies(rSocketStrategies);
customizers.orderedStream().forEach((customizer) -> customizer.customize(rsocketMessageHandler));
registerFunctionsWithRSocketHandler(rsocketMessageHandler, functionCatalog, functionProperties);
return rsocketMessageHandler;
}
private void registerFunctionsWithRSocketHandler(FunctionRSocketMessageHandler rsocketMessageHandler,
FunctionCatalog functionCatalog, FunctionProperties functionProperties) {
String definition = functionProperties.getDefinition();
if (StringUtils.hasText(definition)) {
FunctionInvocationWrapper function = FunctionRSocketUtils
.registerFunctionForDestination(definition, functionCatalog, this.applicationContext);
rsocketMessageHandler.registerFunctionHandler(new RSocketListenerFunction(function), definition);
}
}
}

View File

@@ -57,6 +57,16 @@ public class RSocketAutoConfigurationRoutingTests {
RSocketRequester.Builder rsocketRequesterBuilder =
applicationContext.getBean(RSocketRequester.Builder.class);
rsocketRequesterBuilder.tcp("localhost", port)
.route("uppercase")
.metadata("{\"func_name\":\"echo\"}", MimeTypeUtils.APPLICATION_JSON)
.data("hello")
.retrieveMono(String.class)
.as(StepVerifier::create)
.expectNext("hello")
.expectComplete()
.verify();
rsocketRequesterBuilder.tcp("localhost", port)
.route("")
.metadata("{\"func_name\":\"echo\"}", MimeTypeUtils.APPLICATION_JSON)
@@ -69,13 +79,23 @@ public class RSocketAutoConfigurationRoutingTests {
rsocketRequesterBuilder.tcp("localhost", port)
.route(RoutingFunction.FUNCTION_NAME)
.metadata("{\"func_name\":\"uppercase\"}", MimeTypeUtils.APPLICATION_JSON)
.metadata("{\"func_name\":\"echo\"}", MimeTypeUtils.APPLICATION_JSON)
.data("hello")
.retrieveMono(String.class)
.as(StepVerifier::create)
.expectNext("HELLO")
.expectNext("hello")
.expectComplete()
.verify();
// rsocketRequesterBuilder.tcp("localhost", port)
// .route(RoutingFunction.FUNCTION_NAME)
// .metadata("{\"func_name\":\"uppercase\"}", MimeTypeUtils.APPLICATION_JSON)
// .data("hello")
// .retrieveMono(String.class)
// .as(StepVerifier::create)
// .expectNext("HELLO")
// .expectComplete()
// .verify();
}
}