diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java index 9f50a8318..4c43e3280 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java @@ -69,7 +69,7 @@ public class RSocketAutoConfiguration { private final RSocketFunctionProperties rSocketFunctionProperties; - private RSocketFunction invocableFunction; + private RSocketListenerFunction invocableFunction; private GenericApplicationContext context; @@ -82,7 +82,7 @@ public class RSocketAutoConfiguration { @Override public void afterPropertiesSet() throws Exception { String definition = this.functionProperties.getDefinition(); - this.registerRsocketProxiesIfNecessary(definition); + this.registerRsocketForwardingFunctionIfNecessary(definition); //TODO externalize content-type FunctionInvocationWrapper function = functionCatalog.lookup(definition, "application/json"); if (function.isSupplier()) { @@ -93,21 +93,20 @@ public class RSocketAutoConfiguration { .createUnresolved(this.rSocketFunctionProperties.getBindAddress(), this.rSocketFunctionProperties.getBindPort()); if (this.invocableFunction == null) { - this.invocableFunction = new RSocketFunction(function, bindAddress); + this.invocableFunction = new RSocketListenerFunction(function, bindAddress); this.invocableFunction.start(); } } @SuppressWarnings({ "rawtypes", "unchecked" }) - private void registerRsocketProxiesIfNecessary(String definition) { + private void registerRsocketForwardingFunctionIfNecessary(String definition) { String[] names = StringUtils.delimitedListToStringArray(definition.replaceAll(",", "|").trim(), "|"); -// InetSocketAddress listenAddress = InetSocketAddress -// .createUnresolved(this.rSocketFunctionProperties.getBindAddress(), this.rSocketFunctionProperties.getBindPort()); - - for (String name : names) { if (!this.context.containsBean(name)) { // this means RSocket + if (logger.isDebugEnabled()) { + logger.debug("Registering rsocket forwarder for '" + name + "' function."); + } String[] functionToRSocketDefinition = StringUtils.delimitedListToStringArray(name, ">"); Assert.isTrue(functionToRSocketDefinition.length == 2, "Must only contain one output redirect"); FunctionInvocationWrapper function = functionCatalog.lookup(functionToRSocketDefinition[0], "application/json"); @@ -119,11 +118,8 @@ public class RSocketAutoConfiguration { RSocketForwardingFunction rsocketFunction = new RSocketForwardingFunction(function, outputAddress); FunctionRegistration functionRegistration = new FunctionRegistration(rsocketFunction, name); - functionRegistration.type(FunctionTypeUtils.discoverFunctionTypeFromClass(RSocketFunction.class)); + functionRegistration.type(FunctionTypeUtils.discoverFunctionTypeFromClass(RSocketListenerFunction.class)); ((FunctionRegistry) this.functionCatalog).register(functionRegistration); -// -// this.invocableFunction = rsocketFunction; -// this.invocableFunction.start(); } } } diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketForwardingFunction.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketForwardingFunction.java index 3762d6e54..cb76b93ba 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketForwardingFunction.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketForwardingFunction.java @@ -1,3 +1,19 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.springframework.cloud.function.rsocket; import java.net.InetSocketAddress; @@ -5,21 +21,28 @@ import java.nio.ByteBuffer; import java.time.Duration; import java.util.function.Function; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Publisher; -import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; -import org.springframework.lang.Nullable; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; - import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.core.RSocketConnector; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.util.DefaultPayload; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; import reactor.util.retry.Retry; +import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + + + +/** + * + * @author Oleg Zhurakousky + * @since 3.1 + * + */ class RSocketForwardingFunction implements Function, Publisher>> { private static Log logger = LogFactory.getLog(RSocketForwardingFunction.class); @@ -36,6 +59,7 @@ class RSocketForwardingFunction implements Function, Publisher> apply(Message input) { if (logger.isDebugEnabled()) { diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunctionProperties.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunctionProperties.java index c7c5419fd..d7848679f 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunctionProperties.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunctionProperties.java @@ -33,10 +33,6 @@ public class RSocketFunctionProperties { private int bindPort = 55555; -// private String targetAddress; -// -// private int targetPort; - public String getBindAddress() { return bindAddress; } @@ -52,20 +48,4 @@ public class RSocketFunctionProperties { public void setBindPort(int bindPort) { this.bindPort = bindPort; } - -// public String getTargetAddress() { -// return targetAddress; -// } -// -// public void setTargetAddress(String targetAddress) { -// this.targetAddress = targetAddress; -// } - -// public int getTargetPort() { -// return targetPort; -// } -// -// public void setTargetPort(int targetPort) { -// this.targetPort = targetPort; -// } } diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunction.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java similarity index 94% rename from spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunction.java rename to spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java index 927e90caa..c55fccc9d 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketFunction.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java @@ -44,7 +44,7 @@ import org.springframework.messaging.support.MessageBuilder; * @author Oleg Zhurakousky * @since 3.1 */ -class RSocketFunction implements Function, Publisher>> { +public class RSocketListenerFunction implements Function, Publisher>> { private static String splash = " ____ _ _______ __ ____ __ _ ___ ____ __ __ \n" + " / __/__ ____(_)__ ___ _ / ___/ /__ __ _____/ / / __/_ _____ ____/ /_(_)__ ___ / _ \\/ __/__ ____/ /_____ / /_\n" + @@ -53,7 +53,7 @@ class RSocketFunction implements Function, Publisher, Publisher, Publisher