Initial rsocket-broker sample
This commit is contained in:
@@ -15,6 +15,10 @@
|
||||
<version>3.1.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
<rsocket-routing.version>0.0.2-SNAPSHOT</rsocket-routing.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
@@ -35,19 +39,31 @@
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
<artifactId>spring-boot-configuration-processor</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-configuration-processor</artifactId>
|
||||
<optional>true</optional>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.projectreactor</groupId>
|
||||
<artifactId>reactor-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.rsocket.routing</groupId>
|
||||
<artifactId>rsocket-routing-client-spring</artifactId>
|
||||
<version>${rsocket-routing.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.rsocket.routing</groupId>
|
||||
<artifactId>rsocket-routing-broker-spring</artifactId>
|
||||
<version>${rsocket-routing.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
||||
@@ -18,11 +18,16 @@ package org.springframework.cloud.function.rsocket;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import io.rsocket.RSocket;
|
||||
import io.rsocket.SocketAcceptor;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.context.FunctionProperties;
|
||||
@@ -36,28 +41,39 @@ import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.messaging.rsocket.RSocketConnectorConfigurer;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Main configuration class for components required to support RSocket integration with spring-cloud-function.
|
||||
* Main configuration class for components required to support RSocket integration with
|
||||
* spring-cloud-function.
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
* @since 3.1
|
||||
*/
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
@EnableConfigurationProperties({FunctionProperties.class, RSocketFunctionProperties.class})
|
||||
@EnableConfigurationProperties({ FunctionProperties.class, RSocketFunctionProperties.class })
|
||||
@ConditionalOnProperty(name = FunctionProperties.PREFIX + ".rsocket.enabled", matchIfMissing = true)
|
||||
@AutoConfigureBefore(name = "io.rsocket.routing.client.spring.RoutingClientAutoConfiguration")
|
||||
public class RSocketAutoConfiguration {
|
||||
|
||||
private static Log logger = LogFactory.getLog(RSocketAutoConfiguration.class);
|
||||
|
||||
|
||||
@Bean
|
||||
public FunctionToRSocketBinder functionToDestinationBinder(FunctionCatalog functionCatalog,
|
||||
FunctionProperties functionProperties, RSocketFunctionProperties rSocketFunctionProperties) {
|
||||
return new FunctionToRSocketBinder(functionCatalog, functionProperties, rSocketFunctionProperties);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnClass(name = "io.rsocket.routing.client.spring.RoutingClientAutoConfiguration")
|
||||
//TODO: move to broker specific auto config
|
||||
public RSocketConnectorConfigurer functionRSocketConnectorConfigurer(
|
||||
FunctionToRSocketBinder binder) {
|
||||
return connector -> connector.acceptor(SocketAcceptor.with(binder.getRSocket()));
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@@ -81,6 +97,7 @@ public class RSocketAutoConfiguration {
|
||||
this.functionProperties = functionProperties;
|
||||
this.rSocketFunctionProperties = rSocketFunctionProperties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
String definition = this.functionProperties.getDefinition();
|
||||
@@ -92,16 +109,28 @@ public class RSocketAutoConfiguration {
|
||||
}
|
||||
Assert.isTrue(StringUtils.hasText(definition), "Failed to determine target function for RSocket.");
|
||||
this.registerRsocketForwardingFunctionIfNecessary(definition);
|
||||
//TODO externalize content-type
|
||||
// TODO externalize content-type
|
||||
FunctionInvocationWrapper function = functionCatalog.lookup(definition, "application/json");
|
||||
if (function.isSupplier()) {
|
||||
throw new UnsupportedOperationException("Supplier is not currently supported for RSocket interaction");
|
||||
}
|
||||
|
||||
InetSocketAddress bindAddress = InetSocketAddress
|
||||
.createUnresolved(this.rSocketFunctionProperties.getBindAddress(), this.rSocketFunctionProperties.getBindPort());
|
||||
if (StringUtils.hasText(rSocketFunctionProperties.getBindAddress())
|
||||
&& rSocketFunctionProperties.getBindPort() != null) {
|
||||
InetSocketAddress bindAddress = InetSocketAddress.createUnresolved(
|
||||
this.rSocketFunctionProperties.getBindAddress(), this.rSocketFunctionProperties.getBindPort());
|
||||
this.invocableFunction = new RSocketListenerFunction(function, bindAddress);
|
||||
}
|
||||
else {
|
||||
this.invocableFunction = new RSocketListenerFunction(function, null);
|
||||
}
|
||||
}
|
||||
|
||||
this.invocableFunction = new RSocketListenerFunction(function, bindAddress);
|
||||
RSocket getRSocket() {
|
||||
if (this.invocableFunction == null) {
|
||||
return null;
|
||||
}
|
||||
return this.invocableFunction.getRsocket();
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
@@ -115,16 +144,18 @@ public class RSocketAutoConfiguration {
|
||||
}
|
||||
String[] functionToRSocketDefinition = StringUtils.delimitedListToStringArray(name, ">");
|
||||
Assert.isTrue(functionToRSocketDefinition.length == 2, "Must only contain one output redirect");
|
||||
FunctionInvocationWrapper function = functionCatalog.lookup(functionToRSocketDefinition[0], "application/json");
|
||||
FunctionInvocationWrapper function = functionCatalog.lookup(functionToRSocketDefinition[0],
|
||||
"application/json");
|
||||
|
||||
String[] hostPort = StringUtils.delimitedListToStringArray(functionToRSocketDefinition[1], ":");
|
||||
InetSocketAddress outputAddress = InetSocketAddress
|
||||
.createUnresolved(hostPort[0], Integer.valueOf(hostPort[1]));
|
||||
InetSocketAddress outputAddress = InetSocketAddress.createUnresolved(hostPort[0],
|
||||
Integer.valueOf(hostPort[1]));
|
||||
|
||||
RSocketForwardingFunction rsocketFunction = new RSocketForwardingFunction(function, outputAddress);
|
||||
FunctionRegistration functionRegistration = new FunctionRegistration(rsocketFunction, name);
|
||||
|
||||
functionRegistration.type(FunctionTypeUtils.discoverFunctionTypeFromClass(RSocketListenerFunction.class));
|
||||
functionRegistration
|
||||
.type(FunctionTypeUtils.discoverFunctionTypeFromClass(RSocketListenerFunction.class));
|
||||
((FunctionRegistry) this.functionCatalog).register(functionRegistration);
|
||||
}
|
||||
}
|
||||
@@ -141,15 +172,19 @@ public class RSocketAutoConfiguration {
|
||||
this.invocableFunction.start();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (this.isRunning() && this.invocableFunction != null) {
|
||||
this.invocableFunction.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return this.started;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -29,9 +29,19 @@ import org.springframework.cloud.function.context.FunctionProperties;
|
||||
@ConfigurationProperties(prefix = FunctionProperties.PREFIX + ".rsocket")
|
||||
public class RSocketFunctionProperties {
|
||||
|
||||
private String bindAddress = "localhost";
|
||||
private boolean enabled;
|
||||
|
||||
private int bindPort = 55555;
|
||||
private String bindAddress;
|
||||
|
||||
private Integer bindPort;
|
||||
|
||||
public boolean isEnabled() {
|
||||
return this.enabled;
|
||||
}
|
||||
|
||||
public void setEnabled(boolean enabled) {
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
public String getBindAddress() {
|
||||
return bindAddress;
|
||||
@@ -41,11 +51,11 @@ public class RSocketFunctionProperties {
|
||||
this.bindAddress = bindAddress;
|
||||
}
|
||||
|
||||
public int getBindPort() {
|
||||
public Integer getBindPort() {
|
||||
return bindPort;
|
||||
}
|
||||
|
||||
public void setBindPort(int bindPort) {
|
||||
public void setBindPort(Integer bindPort) {
|
||||
this.bindPort = bindPort;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,6 +60,7 @@ public class RSocketListenerFunction implements Function<Message<byte[]>, Publis
|
||||
private final FunctionInvocationWrapper targetFunction;
|
||||
|
||||
private Disposable rsocketConnection;
|
||||
private RSocket rsocket;
|
||||
|
||||
RSocketListenerFunction(FunctionInvocationWrapper targetFunction, InetSocketAddress listenAddress) {
|
||||
this.listenAddress = listenAddress;
|
||||
@@ -80,7 +81,9 @@ public class RSocketListenerFunction implements Function<Message<byte[]>, Publis
|
||||
void start() {
|
||||
Type functionType = this.targetFunction.getFunctionType();
|
||||
|
||||
RSocket rsocket = buildRSocket(this.targetFunction.getFunctionDefinition(), functionType, this);
|
||||
if (rsocket == null) {
|
||||
rsocket = buildRSocket(this.targetFunction, functionType, this);
|
||||
}
|
||||
if (this.listenAddress != null) {
|
||||
this.rsocketConnection = RSocketConnectionUtils.createServerSocket(rsocket, this.listenAddress);
|
||||
this.printSplashScreen(this.targetFunction.getFunctionDefinition(), functionType);
|
||||
@@ -93,7 +96,15 @@ public class RSocketListenerFunction implements Function<Message<byte[]>, Publis
|
||||
}
|
||||
}
|
||||
|
||||
private RSocket buildRSocket(String definition, Type functionType, Function<Message<byte[]>, Publisher<Message<byte[]>>> function) {
|
||||
public RSocket getRsocket() {
|
||||
if (this.rsocket == null) {
|
||||
start();
|
||||
}
|
||||
return this.rsocket;
|
||||
}
|
||||
|
||||
private RSocket buildRSocket(FunctionInvocationWrapper targetFunction, Type functionType, Function<Message<byte[]>, Publisher<Message<byte[]>>> function) {
|
||||
String definition = targetFunction.getFunctionDefinition();
|
||||
RSocket clientRSocket = new RSocket() { // imperative function or Function<?, Mono> = requestResponse
|
||||
@Override
|
||||
public Mono<Payload> requestResponse(Payload payload) {
|
||||
|
||||
@@ -0,0 +1,224 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.function.rsocket;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.CharBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.rsocket.Payload;
|
||||
import io.rsocket.RSocket;
|
||||
import io.rsocket.core.RSocketConnector;
|
||||
import io.rsocket.frame.decoder.PayloadDecoder;
|
||||
import io.rsocket.metadata.CompositeMetadataCodec;
|
||||
import io.rsocket.routing.broker.spring.MimeTypes;
|
||||
import io.rsocket.routing.common.Id;
|
||||
import io.rsocket.routing.common.Tags;
|
||||
import io.rsocket.routing.common.WellKnownKey;
|
||||
import io.rsocket.routing.frames.AddressFlyweight;
|
||||
import io.rsocket.routing.frames.RouteSetupFlyweight;
|
||||
import io.rsocket.transport.netty.client.TcpClientTransport;
|
||||
import io.rsocket.util.DefaultPayload;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.messaging.rsocket.RSocketStrategies;
|
||||
import org.springframework.util.SocketUtils;
|
||||
|
||||
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
|
||||
import static io.rsocket.routing.broker.spring.MimeTypes.COMPOSITE_MIME_TYPE;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
* @since 3.1
|
||||
*/
|
||||
public class RoutingBrokerTests {
|
||||
|
||||
@Test
|
||||
public void testImperativeFunctionAsRequestReply() throws Exception {
|
||||
|
||||
ConfigurableApplicationContext functionContext = null;
|
||||
ConfigurableApplicationContext brokerContext = null;
|
||||
try {
|
||||
int routingBrokerProxyPort = SocketUtils.findAvailableTcpPort();
|
||||
int routingBrokerClusterPort = SocketUtils.findAvailableTcpPort();
|
||||
|
||||
// start broker
|
||||
brokerContext = new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--logging.level.io.rsocket.routing.broker=TRACE",
|
||||
"--spring.cloud.function.rsocket.enabled=false",
|
||||
"--io.rsocket.routing.client.enabled=false",
|
||||
"--io.rsocket.routing.broker.enabled=true",
|
||||
"--io.rsocket.routing.broker.tcp.port=" + routingBrokerProxyPort,
|
||||
"--io.rsocket.routing.broker.cluster.port=" + routingBrokerClusterPort);
|
||||
|
||||
// start function connecting to broker, service-name=toupper
|
||||
functionContext = new SpringApplicationBuilder(SampleFunctionConfiguration.class)
|
||||
.web(WebApplicationType.NONE).run(
|
||||
"--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
"--io.rsocket.routing.client.enabled=true",
|
||||
"--io.rsocket.routing.client.service-name=toupper",
|
||||
"--io.rsocket.routing.client.brokers[0].host=localhost",
|
||||
"--io.rsocket.routing.client.brokers[0].port=" + routingBrokerProxyPort,
|
||||
"--io.rsocket.routing.broker.enabled=false",
|
||||
"--spring.cloud.function.definition=uppercase");
|
||||
|
||||
// setup metadata to identify the this test connecting to broker.
|
||||
RSocketStrategies strategies = functionContext.getBean(RSocketStrategies.class);
|
||||
Id testerId = Id.random();
|
||||
ByteBuf routeSetup = encodeRouteSetup(strategies, testerId, "tester");
|
||||
Payload setupPayload = DefaultPayload.create(EMPTY_BUFFER, routeSetup);
|
||||
|
||||
// connect to broker
|
||||
RSocket socket = RSocketConnector.create().payloadDecoder(PayloadDecoder.ZERO_COPY)
|
||||
.metadataMimeType(COMPOSITE_MIME_TYPE.toString())
|
||||
.setupPayload(setupPayload)
|
||||
.connect(TcpClientTransport.create(routingBrokerProxyPort))
|
||||
.block();
|
||||
|
||||
// setup data for request to toupper service
|
||||
ByteBuffer data = StandardCharsets.UTF_8.encode(CharBuffer.wrap("\"hello\""));
|
||||
// setup metadata for request to toupper service
|
||||
ByteBuf routingMetadata = encodeAddress(strategies, testerId, "toupper");
|
||||
Payload payload = DefaultPayload.create(data, routingMetadata.nioBuffer());
|
||||
// call toupper service
|
||||
Mono<String> result = socket.requestResponse(payload).map(Payload::getDataUtf8);
|
||||
|
||||
StepVerifier
|
||||
.create(result)
|
||||
.expectNext("\"HELLO\"")
|
||||
.expectComplete()
|
||||
.verify();
|
||||
} finally {
|
||||
if (functionContext != null) {
|
||||
functionContext.close();
|
||||
}
|
||||
if (brokerContext != null) {
|
||||
brokerContext.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static ByteBuf encodeAddress(RSocketStrategies strategies, Id originRouteId, String serviceName) {
|
||||
Tags tags = Tags.builder().with(WellKnownKey.SERVICE_NAME, serviceName)
|
||||
.buildTags();
|
||||
ByteBuf address = AddressFlyweight
|
||||
.encode(ByteBufAllocator.DEFAULT, originRouteId, Tags.empty(), tags);
|
||||
|
||||
CompositeByteBuf composite = encodeComposite(address, MimeTypes.ROUTING_FRAME_MIME_TYPE
|
||||
.toString());
|
||||
return composite;
|
||||
}
|
||||
|
||||
private static ByteBuf encodeRouteSetup(RSocketStrategies strategies, Id routeId, String serviceName) {
|
||||
Tags tags = Tags.builder()
|
||||
.with("current-time", String.valueOf(System.currentTimeMillis()))
|
||||
.with(WellKnownKey.TIME_ZONE, System.currentTimeMillis() + "")
|
||||
.buildTags();
|
||||
ByteBuf routeSetup = RouteSetupFlyweight
|
||||
.encode(ByteBufAllocator.DEFAULT, routeId, serviceName, tags);
|
||||
|
||||
CompositeByteBuf composite = encodeComposite(routeSetup, MimeTypes.ROUTING_FRAME_MIME_TYPE
|
||||
.toString());
|
||||
return composite;
|
||||
}
|
||||
|
||||
private static CompositeByteBuf encodeComposite(ByteBuf byteBuf, String mimeType) {
|
||||
CompositeByteBuf composite = ByteBufAllocator.DEFAULT.compositeBuffer();
|
||||
CompositeMetadataCodec
|
||||
.encodeAndAddMetadata(composite, ByteBufAllocator.DEFAULT,
|
||||
mimeType, byteBuf);
|
||||
return composite;
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
public static class RoutingBrokerConfiguration {
|
||||
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
public static class SampleFunctionConfiguration {
|
||||
@Bean
|
||||
public Function<String, String> uppercase() {
|
||||
return v -> {
|
||||
return v.toUpperCase();
|
||||
};
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Function<String, String> concat() {
|
||||
return v -> {
|
||||
return v + v;
|
||||
};
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Function<String, String> echo() {
|
||||
return v -> v;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Function<Flux<String>, Flux<String>> uppercaseReactive() {
|
||||
return flux -> flux.map(v -> {
|
||||
System.out.println("Uppercasing: " + v);
|
||||
return v.toUpperCase();
|
||||
});
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Consumer<byte[]> log() {
|
||||
return v -> {
|
||||
System.out.println("==> In Consumer: " + new String(v));
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
public static class AdditionalFunctionConfiguration {
|
||||
@Bean
|
||||
public Function<String, String> reverse() {
|
||||
return v -> {
|
||||
return new StringBuilder(v).reverse().toString();
|
||||
};
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Function<String, String> wrap() {
|
||||
return v -> {
|
||||
return "(" + v + ")";
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,2 @@
|
||||
io.rsocket.routing.broker.enabled=false
|
||||
io.rsocket.routing.client.enabled=false
|
||||
Reference in New Issue
Block a user