From 57776c6b0cc8dec3cfb0a23ef105c441ba0ae809 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 22 Feb 2021 11:11:12 +0100 Subject: [PATCH] GH-653 Make consistent priority order of function definition in RSocket With this commit spring.cloud.function.routing-expression property takes precedence over all, then route() and then spring.cloud.function.definition property --- .../FunctionRSocketMessageHandler.java | 39 +++++++++++++------ .../rsocket/RSocketAutoConfiguration.java | 27 +------------ .../RSocketAutoConfigurationRoutingTests.java | 24 +++++++++++- 3 files changed, 51 insertions(+), 39 deletions(-) diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java index 990b642dc..867a093a7 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 the original author or authors. + * Copyright 2020-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -59,6 +59,7 @@ import org.springframework.messaging.rsocket.annotation.support.RSocketPayloadRe import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.MimeTypeUtils; import org.springframework.util.ReflectionUtils; +import org.springframework.util.RouteMatcher.Route; import org.springframework.util.StringUtils; import org.springframework.web.util.pattern.PathPatternRouteMatcher; @@ -116,10 +117,7 @@ class FunctionRSocketMessageHandler extends RSocketMessageHandler { @Override public Mono handleMessage(Message message) throws MessagingException { if (!FrameType.SETUP.equals(message.getHeaders().get("rsocketFrameType"))) { - String destination = this.getDestination(message).value(); - if (!StringUtils.hasText(destination)) { - destination = this.discoverAndInjectDestinationHeader(message); - } + String destination = this.discoverAndInjectDestinationHeader(message); Set mappings = this.getDestinationLookup().keySet(); if (!mappings.contains(destination)) { @@ -154,16 +152,33 @@ class FunctionRSocketMessageHandler extends RSocketMessageHandler { @SuppressWarnings("unchecked") private String discoverAndInjectDestinationHeader(Message message) { - String destination = this.functionProperties.getDefinition(); - if (!StringUtils.hasText(destination) && StringUtils.hasText(this.functionProperties.getRoutingExpression())) { + + String destination; + if (StringUtils.hasText(this.functionProperties.getRoutingExpression())) { destination = RoutingFunction.FUNCTION_NAME; + Map headersMap = (Map) ReflectionUtils + .getField(this.headersField, message.getHeaders()); + PathPatternRouteMatcher matcher = new PathPatternRouteMatcher(); + headersMap.put(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, matcher.parseRoute(destination)); + } + else { + Route route = (Route) message.getHeaders().get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER); + destination = route.value(); + if (!StringUtils.hasText(destination)) { + destination = this.functionProperties.getDefinition(); + Map headersMap = (Map) ReflectionUtils + .getField(this.headersField, message.getHeaders()); + PathPatternRouteMatcher matcher = new PathPatternRouteMatcher(); + headersMap.put(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, matcher.parseRoute(destination)); + } } - Map headersMap = (Map) ReflectionUtils - .getField(this.headersField, message.getHeaders()); - PathPatternRouteMatcher matcher = new PathPatternRouteMatcher(); - - headersMap.put(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, matcher.parseRoute(destination)); + if (!StringUtils.hasText(destination) && logger.isDebugEnabled()) { + logger.debug("Failed to discover function definition. Neither " + + "`spring.cloud.function.definition`, nor `.route()`, nor " + + "`spring.cloud.function.routing-expression` were provided. Wil use empty string " + + "for lookup, which will work only if there is one function in Function Catalog"); + } return destination; } diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java index dc9b18f20..e7bbf436f 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 the original author or authors. + * Copyright 2020-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,6 @@ package org.springframework.cloud.function.rsocket; -import org.springframework.beans.BeansException; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -24,14 +23,10 @@ import org.springframework.boot.autoconfigure.rsocket.RSocketMessageHandlerCusto import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; -import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.messaging.rsocket.RSocketStrategies; -import org.springframework.util.StringUtils; /** * Main configuration class for components required to support RSocket integration with @@ -45,14 +40,8 @@ import org.springframework.util.StringUtils; @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties({ FunctionProperties.class, RSocketFunctionProperties.class }) @ConditionalOnProperty(name = FunctionProperties.PREFIX + ".rsocket.enabled", matchIfMissing = true) -class RSocketAutoConfiguration implements ApplicationContextAware { +class RSocketAutoConfiguration { - private ApplicationContext applicationContext; - - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - this.applicationContext = applicationContext; - } @Bean @ConditionalOnMissingBean @@ -64,18 +53,6 @@ class RSocketAutoConfiguration implements ApplicationContextAware { FunctionRSocketMessageHandler rsocketMessageHandler = new FunctionRSocketMessageHandler(functionCatalog, functionProperties); rsocketMessageHandler.setRSocketStrategies(rSocketStrategies); customizers.orderedStream().forEach((customizer) -> customizer.customize(rsocketMessageHandler)); - registerFunctionsWithRSocketHandler(rsocketMessageHandler, functionCatalog, functionProperties); return rsocketMessageHandler; } - - private void registerFunctionsWithRSocketHandler(FunctionRSocketMessageHandler rsocketMessageHandler, - FunctionCatalog functionCatalog, FunctionProperties functionProperties) { - String definition = functionProperties.getDefinition(); - if (StringUtils.hasText(definition)) { - FunctionInvocationWrapper function = FunctionRSocketUtils - .registerFunctionForDestination(definition, functionCatalog, this.applicationContext); - rsocketMessageHandler.registerFunctionHandler(new RSocketListenerFunction(function), definition); - } - } - } diff --git a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java index e37a9b900..82a9a290d 100644 --- a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java +++ b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java @@ -57,6 +57,16 @@ public class RSocketAutoConfigurationRoutingTests { RSocketRequester.Builder rsocketRequesterBuilder = applicationContext.getBean(RSocketRequester.Builder.class); + rsocketRequesterBuilder.tcp("localhost", port) + .route("uppercase") + .metadata("{\"func_name\":\"echo\"}", MimeTypeUtils.APPLICATION_JSON) + .data("hello") + .retrieveMono(String.class) + .as(StepVerifier::create) + .expectNext("hello") + .expectComplete() + .verify(); + rsocketRequesterBuilder.tcp("localhost", port) .route("") .metadata("{\"func_name\":\"echo\"}", MimeTypeUtils.APPLICATION_JSON) @@ -69,13 +79,23 @@ public class RSocketAutoConfigurationRoutingTests { rsocketRequesterBuilder.tcp("localhost", port) .route(RoutingFunction.FUNCTION_NAME) - .metadata("{\"func_name\":\"uppercase\"}", MimeTypeUtils.APPLICATION_JSON) + .metadata("{\"func_name\":\"echo\"}", MimeTypeUtils.APPLICATION_JSON) .data("hello") .retrieveMono(String.class) .as(StepVerifier::create) - .expectNext("HELLO") + .expectNext("hello") .expectComplete() .verify(); + +// rsocketRequesterBuilder.tcp("localhost", port) +// .route(RoutingFunction.FUNCTION_NAME) +// .metadata("{\"func_name\":\"uppercase\"}", MimeTypeUtils.APPLICATION_JSON) +// .data("hello") +// .retrieveMono(String.class) +// .as(StepVerifier::create) +// .expectNext("HELLO") +// .expectComplete() +// .verify(); } }