From a4763ee8791e51ff136d08b6c46ad3bda171f60c Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Tue, 15 Sep 2020 13:51:49 +0200 Subject: [PATCH] Inject LOOKUP_DESTINATION_HEADER if definition property is not used --- .../function/context/FunctionProperties.java | 22 +++++++++++++++ .../catalog/SimpleFunctionRegistry.java | 3 +- .../FunctionRSocketMessageHandler.java | 28 ++++++++++++++++++- .../rsocket/RSocketAutoConfiguration.java | 3 +- 4 files changed, 52 insertions(+), 4 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java index 897e319c2..89b9f950a 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java @@ -43,6 +43,12 @@ public class FunctionProperties { */ private String definition; + + private String contentType = "application/json"; + + + private String accept = "application/json"; + /** * SpEL expression which should result in function definition (e.g., function name or composition instruction). * NOTE: SpEL evaluation context's root object is the input argument (e.g., Message). @@ -64,4 +70,20 @@ public class FunctionProperties { public void setRoutingExpression(String routingExpression) { this.routingExpression = routingExpression; } + + public String getContentType() { + return contentType; + } + + public void setContentType(String contentType) { + this.contentType = contentType; + } + + public String getAccept() { + return accept; + } + + public void setAccept(String accept) { + this.accept = accept; + } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index c605415c5..50638cf4d 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -173,6 +173,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect if (this.functionAroundWrapper != null && function != null) { return (T) new FunctionInvocationWrapper(function) { + @SuppressWarnings("rawtypes") @Override Object doApply(Object input, boolean consumer, Function enricher) { return functionAroundWrapper.apply(input, function); @@ -805,7 +806,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect // this needs revisiting as the type is not always Class (think really complex types) Type rawType = FunctionTypeUtils.unwrapActualTypeByIndex(type, 0); if (logger.isDebugEnabled()) { - logger.debug("Raw type of value: " + value + "is " + rawType); + logger.debug("Raw type of value: " + value + " is " + rawType); } if (rawType instanceof ParameterizedType) { diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java index 9205b7aed..f93c250dc 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java @@ -16,6 +16,7 @@ package org.springframework.cloud.function.rsocket; +import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.Collections; import java.util.List; @@ -29,6 +30,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.cloud.function.context.FunctionCatalog; +import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; import org.springframework.core.MethodParameter; import org.springframework.core.ParameterizedTypeReference; @@ -39,8 +41,10 @@ import org.springframework.core.codec.ByteArrayEncoder; import org.springframework.core.codec.Decoder; import org.springframework.core.codec.Encoder; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.http.server.PathContainer; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessagingException; import org.springframework.messaging.handler.CompositeMessageCondition; import org.springframework.messaging.handler.DestinationPatternsMessageCondition; @@ -53,8 +57,13 @@ import org.springframework.messaging.rsocket.annotation.support.RSocketFrameType import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import org.springframework.messaging.rsocket.annotation.support.RSocketPayloadReturnValueHandler; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.AntPathMatcher; import org.springframework.util.MimeTypeUtils; import org.springframework.util.ReflectionUtils; +import org.springframework.util.RouteMatcher; +import org.springframework.util.SimpleRouteMatcher; +import org.springframework.util.StringUtils; +import org.springframework.web.util.pattern.PathPatternRouteMatcher; /** * An {@link RSocketMessageHandler} extension for Spring Cloud Function specifics. @@ -68,6 +77,10 @@ class FunctionRSocketMessageHandler extends RSocketMessageHandler { private final FunctionCatalog functionCatalog; + private final FunctionProperties functionProperties; + + private final Field headersField; + private static final Method FUNCTION_APPLY_METHOD = ReflectionUtils.findMethod(Function.class, "apply", (Class[]) null); @@ -78,9 +91,12 @@ class FunctionRSocketMessageHandler extends RSocketMessageHandler { FrameType.REQUEST_STREAM, FrameType.REQUEST_CHANNEL); - FunctionRSocketMessageHandler(FunctionCatalog functionCatalog) { + FunctionRSocketMessageHandler(FunctionCatalog functionCatalog, FunctionProperties functionProperties) { setHandlerPredicate((clazz) -> false); this.functionCatalog = functionCatalog; + this.functionProperties = functionProperties; + this.headersField = ReflectionUtils.findField(MessageHeaders.class, "headers"); + this.headersField.setAccessible(true); } @@ -100,11 +116,21 @@ class FunctionRSocketMessageHandler extends RSocketMessageHandler { * Will check if there is a function handler registered for destination before proceeding. * This typically happens when user avoids using 'spring.cloud.function.definition' property. */ + @SuppressWarnings("unchecked") @Override public Mono handleMessage(Message message) throws MessagingException { if (!FrameType.SETUP.equals(message.getHeaders().get("rsocketFrameType"))) { String destination = this.getDestination(message).value(); + if (!StringUtils.hasText(destination)) { + destination = this.functionProperties.getDefinition(); + Map headersMap = (Map) ReflectionUtils + .getField(this.headersField, message.getHeaders()); + + PathPatternRouteMatcher matcher = new PathPatternRouteMatcher(); + + headersMap.put(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, matcher.parseRoute(destination)); + } Set mappings = this.getDestinationLookup().keySet(); if (!mappings.contains(destination)) { FunctionInvocationWrapper function = FunctionRSocketUtils diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java index 0c3440a0c..dc9b18f20 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java @@ -61,7 +61,7 @@ class RSocketAutoConfiguration implements ApplicationContextAware { ObjectProvider customizers, FunctionCatalog functionCatalog, FunctionProperties functionProperties) { - FunctionRSocketMessageHandler rsocketMessageHandler = new FunctionRSocketMessageHandler(functionCatalog); + FunctionRSocketMessageHandler rsocketMessageHandler = new FunctionRSocketMessageHandler(functionCatalog, functionProperties); rsocketMessageHandler.setRSocketStrategies(rSocketStrategies); customizers.orderedStream().forEach((customizer) -> customizer.customize(rsocketMessageHandler)); registerFunctionsWithRSocketHandler(rsocketMessageHandler, functionCatalog, functionProperties); @@ -75,7 +75,6 @@ class RSocketAutoConfiguration implements ApplicationContextAware { FunctionInvocationWrapper function = FunctionRSocketUtils .registerFunctionForDestination(definition, functionCatalog, this.applicationContext); rsocketMessageHandler.registerFunctionHandler(new RSocketListenerFunction(function), definition); - rsocketMessageHandler.registerFunctionHandler(new RSocketListenerFunction(function), ""); } }