From e4e8d22f7fd634159460f8e81b9a7723322cc40d Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Fri, 28 Aug 2020 17:47:54 +0200 Subject: [PATCH] Code cleanup for rsocket module --- .../FunctionRSocketMessageHandler.java | 14 +++++++---- .../rsocket/FunctionRSocketUtils.java | 25 ++++++++----------- .../rsocket/RSocketAutoConfiguration.java | 5 ++-- .../rsocket/RSocketForwardingFunction.java | 2 +- .../rsocket/RSocketListenerFunction.java | 2 +- 5 files changed, 24 insertions(+), 24 deletions(-) diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java index 8dcab9a27..6e8c2d612 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java @@ -59,7 +59,7 @@ import org.springframework.util.ReflectionUtils; * * @since 3.1 */ -public class FunctionRSocketMessageHandler extends RSocketMessageHandler { +class FunctionRSocketMessageHandler extends RSocketMessageHandler { private final FunctionCatalog functionCatalog; @@ -73,7 +73,7 @@ public class FunctionRSocketMessageHandler extends RSocketMessageHandler { FrameType.REQUEST_STREAM, FrameType.REQUEST_CHANNEL); - public FunctionRSocketMessageHandler(FunctionCatalog functionCatalog) { + FunctionRSocketMessageHandler(FunctionCatalog functionCatalog) { setHandlerPredicate((clazz) -> false); this.functionCatalog = functionCatalog; } @@ -85,14 +85,18 @@ public class FunctionRSocketMessageHandler extends RSocketMessageHandler { super.afterPropertiesSet(); } + /** + * Will check if there is a function handler registered for destination before proceeding. + * This typically happens when user avoids using 'spring.cloud.function.definition' property. + */ @Override public Mono handleMessage(Message message) throws MessagingException { if (!FrameType.SETUP.equals(message.getHeaders().get("rsocketFrameType"))) { String destination = this.getDestination(message).value(); Set mappings = this.getDestinationLookup().keySet(); if (!mappings.contains(destination)) { - FunctionRSocketUtils.registerRSocketForwardingFunctionIfNecessary(destination, functionCatalog, this.getApplicationContext()); - FunctionInvocationWrapper function = functionCatalog.lookup(destination, "application/json"); + FunctionInvocationWrapper function = FunctionRSocketUtils + .registerFunctionForDestination(destination, functionCatalog, this.getApplicationContext()); this.registerFunctionHandler(new RSocketListenerFunction(function), destination); } } @@ -100,7 +104,7 @@ public class FunctionRSocketMessageHandler extends RSocketMessageHandler { return super.handleMessage(message); } - public void registerFunctionHandler(Function function, String route) { + void registerFunctionHandler(Function function, String route) { CompositeMessageCondition condition = new CompositeMessageCondition(REQUEST_CONDITION, new DestinationPatternsMessageCondition(new String[]{ route }, diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketUtils.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketUtils.java index b4d745df1..94126d7f4 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketUtils.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketUtils.java @@ -50,10 +50,16 @@ final class FunctionRSocketUtils { } - static String registerRSocketForwardingFunctionIfNecessary(String definition, FunctionCatalog functionCatalog, + static FunctionInvocationWrapper registerFunctionForDestination(String destination, FunctionCatalog functionCatalog, + ApplicationContext applicationContext) { + registerRSocketForwardingFunctionIfNecessary(destination, functionCatalog, applicationContext); + FunctionInvocationWrapper function = functionCatalog.lookup(destination, "application/json"); + return function; + } + + static void registerRSocketForwardingFunctionIfNecessary(String definition, FunctionCatalog functionCatalog, ApplicationContext applicationContext) { String[] names = StringUtils.delimitedListToStringArray(definition.replaceAll(",", "|").trim(), "|"); - String rootFunctionName = names[0]; for (String name : names) { if (!applicationContext.containsBean(name)) { // this means RSocket if (LOGGER.isDebugEnabled()) { @@ -65,18 +71,11 @@ final class FunctionRSocketUtils { String[] hostPort = StringUtils.delimitedListToStringArray(functionToRSocketDefinition[1], ":"); - rootFunctionName = function.getFunctionDefinition(); String forwardingUrl = functionToRSocketDefinition[1]; - RSocketRequester rsocketRequester; - Builder rsocketRequesterBuilder = applicationContext.getBean(Builder.class); - - if (WS_URI_PATTERN.matcher(forwardingUrl).matches()) { - rsocketRequester = rsocketRequesterBuilder.websocket(URI.create(forwardingUrl)); - } - else { - rsocketRequester = rsocketRequesterBuilder.tcp(hostPort[0], Integer.parseInt(hostPort[1])); - } + RSocketRequester rsocketRequester = (WS_URI_PATTERN.matcher(forwardingUrl).matches()) + ? rsocketRequesterBuilder.websocket(URI.create(forwardingUrl)) + : rsocketRequesterBuilder.tcp(hostPort[0], Integer.parseInt(hostPort[1])); RSocketForwardingFunction rsocketFunction = new RSocketForwardingFunction(function, rsocketRequester, null); @@ -87,7 +86,5 @@ final class FunctionRSocketUtils { ((FunctionRegistry) functionCatalog).register(functionRegistration); } } - - return rootFunctionName; } } diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java index 3a502c0ec..7c501d47e 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java @@ -70,9 +70,8 @@ class RSocketAutoConfiguration implements ApplicationContextAware { FunctionCatalog functionCatalog, FunctionProperties functionProperties) { String definition = functionProperties.getDefinition(); if (StringUtils.hasText(definition)) { - FunctionRSocketUtils.registerRSocketForwardingFunctionIfNecessary(definition, functionCatalog, this.applicationContext); - //TODO externalize content-type - FunctionInvocationWrapper function = functionCatalog.lookup(definition, "application/json"); + FunctionInvocationWrapper function = FunctionRSocketUtils + .registerFunctionForDestination(definition, functionCatalog, this.applicationContext); rsocketMessageHandler.registerFunctionHandler(new RSocketListenerFunction(function), definition); rsocketMessageHandler.registerFunctionHandler(new RSocketListenerFunction(function), ""); } diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketForwardingFunction.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketForwardingFunction.java index 63343965d..8da7e76e4 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketForwardingFunction.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketForwardingFunction.java @@ -34,7 +34,7 @@ import org.springframework.messaging.support.GenericMessage; * which will use the result of the invocation of such function as an input to another RSocket * effectively composing two functions over RSocket. *

- * Note: the remote RSocket route is not necessary to be as a Spring Cloud Function binding. + * Note: the remote RSocket route is not required to represent Spring Cloud Function binding. * * @author Oleg Zhurakousky * @author Artem Bilan diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java index 8f2865378..6984fa623 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java @@ -40,7 +40,7 @@ import org.springframework.messaging.support.MessageBuilder; * * @since 3.1 */ -public class RSocketListenerFunction implements Function>, Publisher> { +class RSocketListenerFunction implements Function>, Publisher> { private final FunctionInvocationWrapper targetFunction;