Code cleanup for rsocket module
This commit is contained in:
@@ -59,7 +59,7 @@ import org.springframework.util.ReflectionUtils;
|
||||
*
|
||||
* @since 3.1
|
||||
*/
|
||||
public class FunctionRSocketMessageHandler extends RSocketMessageHandler {
|
||||
class FunctionRSocketMessageHandler extends RSocketMessageHandler {
|
||||
|
||||
private final FunctionCatalog functionCatalog;
|
||||
|
||||
@@ -73,7 +73,7 @@ public class FunctionRSocketMessageHandler extends RSocketMessageHandler {
|
||||
FrameType.REQUEST_STREAM,
|
||||
FrameType.REQUEST_CHANNEL);
|
||||
|
||||
public FunctionRSocketMessageHandler(FunctionCatalog functionCatalog) {
|
||||
FunctionRSocketMessageHandler(FunctionCatalog functionCatalog) {
|
||||
setHandlerPredicate((clazz) -> false);
|
||||
this.functionCatalog = functionCatalog;
|
||||
}
|
||||
@@ -85,14 +85,18 @@ public class FunctionRSocketMessageHandler extends RSocketMessageHandler {
|
||||
super.afterPropertiesSet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Will check if there is a function handler registered for destination before proceeding.
|
||||
* This typically happens when user avoids using 'spring.cloud.function.definition' property.
|
||||
*/
|
||||
@Override
|
||||
public Mono<Void> handleMessage(Message<?> message) throws MessagingException {
|
||||
if (!FrameType.SETUP.equals(message.getHeaders().get("rsocketFrameType"))) {
|
||||
String destination = this.getDestination(message).value();
|
||||
Set<String> mappings = this.getDestinationLookup().keySet();
|
||||
if (!mappings.contains(destination)) {
|
||||
FunctionRSocketUtils.registerRSocketForwardingFunctionIfNecessary(destination, functionCatalog, this.getApplicationContext());
|
||||
FunctionInvocationWrapper function = functionCatalog.lookup(destination, "application/json");
|
||||
FunctionInvocationWrapper function = FunctionRSocketUtils
|
||||
.registerFunctionForDestination(destination, functionCatalog, this.getApplicationContext());
|
||||
this.registerFunctionHandler(new RSocketListenerFunction(function), destination);
|
||||
}
|
||||
}
|
||||
@@ -100,7 +104,7 @@ public class FunctionRSocketMessageHandler extends RSocketMessageHandler {
|
||||
return super.handleMessage(message);
|
||||
}
|
||||
|
||||
public void registerFunctionHandler(Function<?, ?> function, String route) {
|
||||
void registerFunctionHandler(Function<?, ?> function, String route) {
|
||||
CompositeMessageCondition condition =
|
||||
new CompositeMessageCondition(REQUEST_CONDITION,
|
||||
new DestinationPatternsMessageCondition(new String[]{ route },
|
||||
|
||||
@@ -50,10 +50,16 @@ final class FunctionRSocketUtils {
|
||||
|
||||
}
|
||||
|
||||
static String registerRSocketForwardingFunctionIfNecessary(String definition, FunctionCatalog functionCatalog,
|
||||
static FunctionInvocationWrapper registerFunctionForDestination(String destination, FunctionCatalog functionCatalog,
|
||||
ApplicationContext applicationContext) {
|
||||
registerRSocketForwardingFunctionIfNecessary(destination, functionCatalog, applicationContext);
|
||||
FunctionInvocationWrapper function = functionCatalog.lookup(destination, "application/json");
|
||||
return function;
|
||||
}
|
||||
|
||||
static void registerRSocketForwardingFunctionIfNecessary(String definition, FunctionCatalog functionCatalog,
|
||||
ApplicationContext applicationContext) {
|
||||
String[] names = StringUtils.delimitedListToStringArray(definition.replaceAll(",", "|").trim(), "|");
|
||||
String rootFunctionName = names[0];
|
||||
for (String name : names) {
|
||||
if (!applicationContext.containsBean(name)) { // this means RSocket
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
@@ -65,18 +71,11 @@ final class FunctionRSocketUtils {
|
||||
|
||||
String[] hostPort = StringUtils.delimitedListToStringArray(functionToRSocketDefinition[1], ":");
|
||||
|
||||
rootFunctionName = function.getFunctionDefinition();
|
||||
String forwardingUrl = functionToRSocketDefinition[1];
|
||||
RSocketRequester rsocketRequester;
|
||||
|
||||
Builder rsocketRequesterBuilder = applicationContext.getBean(Builder.class);
|
||||
|
||||
if (WS_URI_PATTERN.matcher(forwardingUrl).matches()) {
|
||||
rsocketRequester = rsocketRequesterBuilder.websocket(URI.create(forwardingUrl));
|
||||
}
|
||||
else {
|
||||
rsocketRequester = rsocketRequesterBuilder.tcp(hostPort[0], Integer.parseInt(hostPort[1]));
|
||||
}
|
||||
RSocketRequester rsocketRequester = (WS_URI_PATTERN.matcher(forwardingUrl).matches())
|
||||
? rsocketRequesterBuilder.websocket(URI.create(forwardingUrl))
|
||||
: rsocketRequesterBuilder.tcp(hostPort[0], Integer.parseInt(hostPort[1]));
|
||||
|
||||
RSocketForwardingFunction rsocketFunction =
|
||||
new RSocketForwardingFunction(function, rsocketRequester, null);
|
||||
@@ -87,7 +86,5 @@ final class FunctionRSocketUtils {
|
||||
((FunctionRegistry) functionCatalog).register(functionRegistration);
|
||||
}
|
||||
}
|
||||
|
||||
return rootFunctionName;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,9 +70,8 @@ class RSocketAutoConfiguration implements ApplicationContextAware {
|
||||
FunctionCatalog functionCatalog, FunctionProperties functionProperties) {
|
||||
String definition = functionProperties.getDefinition();
|
||||
if (StringUtils.hasText(definition)) {
|
||||
FunctionRSocketUtils.registerRSocketForwardingFunctionIfNecessary(definition, functionCatalog, this.applicationContext);
|
||||
//TODO externalize content-type
|
||||
FunctionInvocationWrapper function = functionCatalog.lookup(definition, "application/json");
|
||||
FunctionInvocationWrapper function = FunctionRSocketUtils
|
||||
.registerFunctionForDestination(definition, functionCatalog, this.applicationContext);
|
||||
rsocketMessageHandler.registerFunctionHandler(new RSocketListenerFunction(function), definition);
|
||||
rsocketMessageHandler.registerFunctionHandler(new RSocketListenerFunction(function), "");
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ import org.springframework.messaging.support.GenericMessage;
|
||||
* which will use the result of the invocation of such function as an input to another RSocket
|
||||
* effectively composing two functions over RSocket.
|
||||
* <p>
|
||||
* Note: the remote RSocket route is not necessary to be as a Spring Cloud Function binding.
|
||||
* Note: the remote RSocket route is not required to represent Spring Cloud Function binding.
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Artem Bilan
|
||||
|
||||
@@ -40,7 +40,7 @@ import org.springframework.messaging.support.MessageBuilder;
|
||||
*
|
||||
* @since 3.1
|
||||
*/
|
||||
public class RSocketListenerFunction implements Function<Message<Flux<byte[]>>, Publisher<?>> {
|
||||
class RSocketListenerFunction implements Function<Message<Flux<byte[]>>, Publisher<?>> {
|
||||
|
||||
private final FunctionInvocationWrapper targetFunction;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user