Inject LOOKUP_DESTINATION_HEADER if definition property is not used
This commit is contained in:
@@ -16,6 +16,7 @@
|
||||
|
||||
package org.springframework.cloud.function.rsocket;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
@@ -29,6 +30,7 @@ import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.context.FunctionProperties;
|
||||
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
|
||||
import org.springframework.core.MethodParameter;
|
||||
import org.springframework.core.ParameterizedTypeReference;
|
||||
@@ -39,8 +41,10 @@ import org.springframework.core.codec.ByteArrayEncoder;
|
||||
import org.springframework.core.codec.Decoder;
|
||||
import org.springframework.core.codec.Encoder;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.http.server.PathContainer;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.messaging.handler.CompositeMessageCondition;
|
||||
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
|
||||
@@ -53,8 +57,13 @@ import org.springframework.messaging.rsocket.annotation.support.RSocketFrameType
|
||||
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
|
||||
import org.springframework.messaging.rsocket.annotation.support.RSocketPayloadReturnValueHandler;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.AntPathMatcher;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
import org.springframework.util.RouteMatcher;
|
||||
import org.springframework.util.SimpleRouteMatcher;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.util.pattern.PathPatternRouteMatcher;
|
||||
|
||||
/**
|
||||
* An {@link RSocketMessageHandler} extension for Spring Cloud Function specifics.
|
||||
@@ -68,6 +77,10 @@ class FunctionRSocketMessageHandler extends RSocketMessageHandler {
|
||||
|
||||
private final FunctionCatalog functionCatalog;
|
||||
|
||||
private final FunctionProperties functionProperties;
|
||||
|
||||
private final Field headersField;
|
||||
|
||||
private static final Method FUNCTION_APPLY_METHOD =
|
||||
ReflectionUtils.findMethod(Function.class, "apply", (Class<?>[]) null);
|
||||
|
||||
@@ -78,9 +91,12 @@ class FunctionRSocketMessageHandler extends RSocketMessageHandler {
|
||||
FrameType.REQUEST_STREAM,
|
||||
FrameType.REQUEST_CHANNEL);
|
||||
|
||||
FunctionRSocketMessageHandler(FunctionCatalog functionCatalog) {
|
||||
FunctionRSocketMessageHandler(FunctionCatalog functionCatalog, FunctionProperties functionProperties) {
|
||||
setHandlerPredicate((clazz) -> false);
|
||||
this.functionCatalog = functionCatalog;
|
||||
this.functionProperties = functionProperties;
|
||||
this.headersField = ReflectionUtils.findField(MessageHeaders.class, "headers");
|
||||
this.headersField.setAccessible(true);
|
||||
}
|
||||
|
||||
|
||||
@@ -100,11 +116,21 @@ class FunctionRSocketMessageHandler extends RSocketMessageHandler {
|
||||
* Will check if there is a function handler registered for destination before proceeding.
|
||||
* This typically happens when user avoids using 'spring.cloud.function.definition' property.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public Mono<Void> handleMessage(Message<?> message) throws MessagingException {
|
||||
|
||||
if (!FrameType.SETUP.equals(message.getHeaders().get("rsocketFrameType"))) {
|
||||
String destination = this.getDestination(message).value();
|
||||
if (!StringUtils.hasText(destination)) {
|
||||
destination = this.functionProperties.getDefinition();
|
||||
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
|
||||
.getField(this.headersField, message.getHeaders());
|
||||
|
||||
PathPatternRouteMatcher matcher = new PathPatternRouteMatcher();
|
||||
|
||||
headersMap.put(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, matcher.parseRoute(destination));
|
||||
}
|
||||
Set<String> mappings = this.getDestinationLookup().keySet();
|
||||
if (!mappings.contains(destination)) {
|
||||
FunctionInvocationWrapper function = FunctionRSocketUtils
|
||||
|
||||
@@ -61,7 +61,7 @@ class RSocketAutoConfiguration implements ApplicationContextAware {
|
||||
ObjectProvider<RSocketMessageHandlerCustomizer> customizers, FunctionCatalog functionCatalog,
|
||||
FunctionProperties functionProperties) {
|
||||
|
||||
FunctionRSocketMessageHandler rsocketMessageHandler = new FunctionRSocketMessageHandler(functionCatalog);
|
||||
FunctionRSocketMessageHandler rsocketMessageHandler = new FunctionRSocketMessageHandler(functionCatalog, functionProperties);
|
||||
rsocketMessageHandler.setRSocketStrategies(rSocketStrategies);
|
||||
customizers.orderedStream().forEach((customizer) -> customizer.customize(rsocketMessageHandler));
|
||||
registerFunctionsWithRSocketHandler(rsocketMessageHandler, functionCatalog, functionProperties);
|
||||
@@ -75,7 +75,6 @@ class RSocketAutoConfiguration implements ApplicationContextAware {
|
||||
FunctionInvocationWrapper function = FunctionRSocketUtils
|
||||
.registerFunctionForDestination(definition, functionCatalog, this.applicationContext);
|
||||
rsocketMessageHandler.registerFunctionHandler(new RSocketListenerFunction(function), definition);
|
||||
rsocketMessageHandler.registerFunctionHandler(new RSocketListenerFunction(function), "");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user