Split RSocketFuntion

Split RSocketFunction into RSocketForwardingFunction and RSocketListenerFunction to ensure function composition over rsocket works the same way as with simple functions.
This commit is contained in:
Oleg Zhurakousky
2020-07-16 19:26:41 +02:00
parent 638c98cfb7
commit 4547580d9d
4 changed files with 44 additions and 45 deletions

View File

@@ -69,7 +69,7 @@ public class RSocketAutoConfiguration {
private final RSocketFunctionProperties rSocketFunctionProperties;
private RSocketFunction invocableFunction;
private RSocketListenerFunction invocableFunction;
private GenericApplicationContext context;
@@ -82,7 +82,7 @@ public class RSocketAutoConfiguration {
@Override
public void afterPropertiesSet() throws Exception {
String definition = this.functionProperties.getDefinition();
this.registerRsocketProxiesIfNecessary(definition);
this.registerRsocketForwardingFunctionIfNecessary(definition);
//TODO externalize content-type
FunctionInvocationWrapper function = functionCatalog.lookup(definition, "application/json");
if (function.isSupplier()) {
@@ -93,21 +93,20 @@ public class RSocketAutoConfiguration {
.createUnresolved(this.rSocketFunctionProperties.getBindAddress(), this.rSocketFunctionProperties.getBindPort());
if (this.invocableFunction == null) {
this.invocableFunction = new RSocketFunction(function, bindAddress);
this.invocableFunction = new RSocketListenerFunction(function, bindAddress);
this.invocableFunction.start();
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private void registerRsocketProxiesIfNecessary(String definition) {
private void registerRsocketForwardingFunctionIfNecessary(String definition) {
String[] names = StringUtils.delimitedListToStringArray(definition.replaceAll(",", "|").trim(), "|");
// InetSocketAddress listenAddress = InetSocketAddress
// .createUnresolved(this.rSocketFunctionProperties.getBindAddress(), this.rSocketFunctionProperties.getBindPort());
for (String name : names) {
if (!this.context.containsBean(name)) { // this means RSocket
if (logger.isDebugEnabled()) {
logger.debug("Registering rsocket forwarder for '" + name + "' function.");
}
String[] functionToRSocketDefinition = StringUtils.delimitedListToStringArray(name, ">");
Assert.isTrue(functionToRSocketDefinition.length == 2, "Must only contain one output redirect");
FunctionInvocationWrapper function = functionCatalog.lookup(functionToRSocketDefinition[0], "application/json");
@@ -119,11 +118,8 @@ public class RSocketAutoConfiguration {
RSocketForwardingFunction rsocketFunction = new RSocketForwardingFunction(function, outputAddress);
FunctionRegistration functionRegistration = new FunctionRegistration(rsocketFunction, name);
functionRegistration.type(FunctionTypeUtils.discoverFunctionTypeFromClass(RSocketFunction.class));
functionRegistration.type(FunctionTypeUtils.discoverFunctionTypeFromClass(RSocketListenerFunction.class));
((FunctionRegistry) this.functionCatalog).register(functionRegistration);
//
// this.invocableFunction = rsocketFunction;
// this.invocableFunction.start();
}
}
}

View File

@@ -1,3 +1,19 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.rsocket;
import java.net.InetSocketAddress;
@@ -5,21 +21,28 @@ import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.util.retry.Retry;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
/**
*
* @author Oleg Zhurakousky
* @since 3.1
*
*/
class RSocketForwardingFunction implements Function<Message<byte[]>, Publisher<Message<byte[]>>> {
private static Log logger = LogFactory.getLog(RSocketForwardingFunction.class);
@@ -36,6 +59,7 @@ class RSocketForwardingFunction implements Function<Message<byte[]>, Publisher<M
.block();
}
@SuppressWarnings("unchecked")
@Override
public Publisher<Message<byte[]>> apply(Message<byte[]> input) {
if (logger.isDebugEnabled()) {

View File

@@ -33,10 +33,6 @@ public class RSocketFunctionProperties {
private int bindPort = 55555;
// private String targetAddress;
//
// private int targetPort;
public String getBindAddress() {
return bindAddress;
}
@@ -52,20 +48,4 @@ public class RSocketFunctionProperties {
public void setBindPort(int bindPort) {
this.bindPort = bindPort;
}
// public String getTargetAddress() {
// return targetAddress;
// }
//
// public void setTargetAddress(String targetAddress) {
// this.targetAddress = targetAddress;
// }
// public int getTargetPort() {
// return targetPort;
// }
//
// public void setTargetPort(int targetPort) {
// this.targetPort = targetPort;
// }
}

View File

@@ -44,7 +44,7 @@ import org.springframework.messaging.support.MessageBuilder;
* @author Oleg Zhurakousky
* @since 3.1
*/
class RSocketFunction implements Function<Message<byte[]>, Publisher<Message<byte[]>>> {
public class RSocketListenerFunction implements Function<Message<byte[]>, Publisher<Message<byte[]>>> {
private static String splash = " ____ _ _______ __ ____ __ _ ___ ____ __ __ \n" +
" / __/__ ____(_)__ ___ _ / ___/ /__ __ _____/ / / __/_ _____ ____/ /_(_)__ ___ / _ \\/ __/__ ____/ /_____ / /_\n" +
@@ -53,7 +53,7 @@ class RSocketFunction implements Function<Message<byte[]>, Publisher<Message<byt
" /_/ /___/ \n" +
"";
private static Log logger = LogFactory.getLog(RSocketFunction.class);
private static Log logger = LogFactory.getLog(RSocketListenerFunction.class);
private final InetSocketAddress listenAddress;
@@ -61,7 +61,7 @@ class RSocketFunction implements Function<Message<byte[]>, Publisher<Message<byt
private Disposable rsocketConnection;
RSocketFunction(FunctionInvocationWrapper targetFunction, InetSocketAddress listenAddress) {
RSocketListenerFunction(FunctionInvocationWrapper targetFunction, InetSocketAddress listenAddress) {
this.listenAddress = listenAddress;
this.targetFunction = targetFunction;
}
@@ -179,9 +179,8 @@ class RSocketFunction implements Function<Message<byte[]>, Publisher<Message<byt
private void printSplashScreen(String definition, Type type) {
System.out.println(splash);
System.out.println("Function Definition: " + definition + ":[" + type + "]");
System.out.println("Function Definition: " + definition + "; T[" + type + "]");
System.out.println("RSocket Listen Address: " + this.listenAddress);
// System.out.println("RSocket Target Address: " + this.outputAddress);
System.out.println("======================================================\n");
}