GH-652 Improve error messaging and function exist check
FunctionRSocketUtils looks up function by a bean name hen it attempts to determine if there is a remote routing. That is not correct since bean name may not exist if function was manually registered. Also, the error message 'Must only contain one output redirect' was not clear as it was not showing the actual function name for propper debugging Resolves #651
This commit is contained in:
@@ -150,38 +150,39 @@ class FunctionRSocketMessageHandler extends RSocketMessageHandler {
|
||||
getReactiveAdapterRegistry()));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private String discoverAndInjectDestinationHeader(Message<?> message) {
|
||||
|
||||
String destination;
|
||||
if (StringUtils.hasText(this.functionProperties.getRoutingExpression())) {
|
||||
destination = RoutingFunction.FUNCTION_NAME;
|
||||
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
|
||||
.getField(this.headersField, message.getHeaders());
|
||||
PathPatternRouteMatcher matcher = new PathPatternRouteMatcher();
|
||||
headersMap.put(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, matcher.parseRoute(destination));
|
||||
this.updateMessageHeaders(message, destination);
|
||||
}
|
||||
else {
|
||||
Route route = (Route) message.getHeaders().get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER);
|
||||
destination = route.value();
|
||||
if (!StringUtils.hasText(destination)) {
|
||||
destination = this.functionProperties.getDefinition();
|
||||
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
|
||||
.getField(this.headersField, message.getHeaders());
|
||||
PathPatternRouteMatcher matcher = new PathPatternRouteMatcher();
|
||||
headersMap.put(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, matcher.parseRoute(destination));
|
||||
this.updateMessageHeaders(message, destination);
|
||||
}
|
||||
}
|
||||
|
||||
if (!StringUtils.hasText(destination) && logger.isDebugEnabled()) {
|
||||
logger.debug("Failed to discover function definition. Neither "
|
||||
+ "`spring.cloud.function.definition`, nor `.route(<function.definition>)`, nor "
|
||||
+ "`spring.cloud.function.routing-expression` were provided. Wil use empty string "
|
||||
+ "`spring.cloud.function.routing-expression` were provided. Will use empty string "
|
||||
+ "for lookup, which will work only if there is one function in Function Catalog");
|
||||
}
|
||||
return destination;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void updateMessageHeaders(Message<?> message, String destination) {
|
||||
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
|
||||
.getField(this.headersField, message.getHeaders());
|
||||
PathPatternRouteMatcher matcher = new PathPatternRouteMatcher();
|
||||
headersMap.put(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, matcher.parseRoute(destination));
|
||||
}
|
||||
|
||||
protected static final class MessageHandlerMethodArgumentResolver implements SyncHandlerMethodArgumentResolver {
|
||||
|
||||
private final Decoder<byte[]> decoder = new ByteArrayDecoder();
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
* Copyright 2020-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -33,6 +33,7 @@ import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.messaging.rsocket.RSocketRequester;
|
||||
import org.springframework.messaging.rsocket.RSocketRequester.Builder;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
@@ -60,14 +61,9 @@ final class FunctionRSocketUtils {
|
||||
String acceptContentType = functionProperties.getExpectedContentType();
|
||||
if (!StringUtils.hasText(acceptContentType)) {
|
||||
FunctionInvocationWrapper function = functionCatalog.lookup(functionDefinition);
|
||||
//Type functionType = function.getFunctionType();
|
||||
Type outputType = function.getOutputType();
|
||||
if (outputType instanceof Class && String.class.isAssignableFrom((Class<?>) outputType)) {
|
||||
acceptContentType = "text/plain";
|
||||
}
|
||||
else {
|
||||
acceptContentType = "application/json";
|
||||
}
|
||||
acceptContentType = (outputType instanceof Class && String.class.isAssignableFrom((Class<?>) outputType))
|
||||
? MimeTypeUtils.TEXT_PLAIN_VALUE : MimeTypeUtils.APPLICATION_JSON_VALUE;
|
||||
}
|
||||
|
||||
FunctionInvocationWrapper function = functionCatalog.lookup(functionDefinition, acceptContentType);
|
||||
@@ -78,13 +74,18 @@ final class FunctionRSocketUtils {
|
||||
ApplicationContext applicationContext) {
|
||||
String[] names = StringUtils.delimitedListToStringArray(definition.replaceAll(",", "|").trim(), "|");
|
||||
for (String name : names) {
|
||||
if (!applicationContext.containsBean(name)) { // this means RSocket
|
||||
|
||||
if (functionCatalog.lookup(name) == null) { // this means RSocket
|
||||
String[] functionToRSocketDefinition = StringUtils.delimitedListToStringArray(name, ">");
|
||||
if (functionToRSocketDefinition.length == 1) {
|
||||
throw new IllegalArgumentException("Function definition '" + name + "' does not exist in Function Catalog");
|
||||
}
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("Registering RSocket forwarder for '" + name + "' function.");
|
||||
}
|
||||
String[] functionToRSocketDefinition = StringUtils.delimitedListToStringArray(name, ">");
|
||||
Assert.isTrue(functionToRSocketDefinition.length == 2, "Must only contain one output redirect");
|
||||
FunctionInvocationWrapper function = functionCatalog.lookup(functionToRSocketDefinition[0], "application/json");
|
||||
|
||||
Assert.isTrue(functionToRSocketDefinition.length == 2, "Must only contain one output redirect. Was '" + name + "'.");
|
||||
FunctionInvocationWrapper function = functionCatalog.lookup(functionToRSocketDefinition[0], MimeTypeUtils.APPLICATION_JSON_VALUE);
|
||||
|
||||
String[] hostPort = StringUtils.delimitedListToStringArray(functionToRSocketDefinition[1], ":");
|
||||
|
||||
|
||||
@@ -139,9 +139,6 @@ public class RSocketAutoConfigurationRoutingTests {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
public static class SampleFunctionConfiguration {
|
||||
|
||||
@@ -47,6 +47,56 @@ import org.springframework.util.SocketUtils;
|
||||
* @since 3.1
|
||||
*/
|
||||
public class RSocketAutoConfigurationTests {
|
||||
|
||||
@Test
|
||||
public void testNonExistingFunctionInRoute() {
|
||||
int port = SocketUtils.findAvailableTcpPort();
|
||||
try (
|
||||
ConfigurableApplicationContext applicationContext =
|
||||
new SpringApplicationBuilder(SampleFunctionConfiguration.class)
|
||||
.web(WebApplicationType.NONE)
|
||||
.run("--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
"--spring.rsocket.server.port=" + port);
|
||||
) {
|
||||
RSocketRequester.Builder rsocketRequesterBuilder =
|
||||
applicationContext.getBean(RSocketRequester.Builder.class);
|
||||
|
||||
rsocketRequesterBuilder.tcp("localhost", port)
|
||||
.route("foo")
|
||||
.data("\"hello\"")
|
||||
.retrieveMono(String.class)
|
||||
.as(StepVerifier::create)
|
||||
.expectError()
|
||||
.verify();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonExistingFunctionInRouteSingleFunctionInCatalog() {
|
||||
int port = SocketUtils.findAvailableTcpPort();
|
||||
try (
|
||||
ConfigurableApplicationContext applicationContext =
|
||||
new SpringApplicationBuilder(SingleFunctionConfiguration.class)
|
||||
.web(WebApplicationType.NONE)
|
||||
.run("--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
"--spring.rsocket.server.port=" + port);
|
||||
) {
|
||||
RSocketRequester.Builder rsocketRequesterBuilder =
|
||||
applicationContext.getBean(RSocketRequester.Builder.class);
|
||||
|
||||
rsocketRequesterBuilder.tcp("localhost", port)
|
||||
.route("blah")
|
||||
.data("\"hello\"")
|
||||
.retrieveMono(String.class)
|
||||
.as(StepVerifier::create)
|
||||
.expectNext("hello")
|
||||
.expectComplete()
|
||||
.verify();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testImperativeFunctionAsRequestReplyWithDefinition() {
|
||||
int port = SocketUtils.findAvailableTcpPort();
|
||||
@@ -535,4 +585,13 @@ public class RSocketAutoConfigurationTests {
|
||||
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
public static class SingleFunctionConfiguration {
|
||||
@Bean
|
||||
public Function<String, String> echo() {
|
||||
return v -> v;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user