diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java index fe6fbf3ef..c295d7dc1 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java @@ -36,9 +36,9 @@ import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionRegistry; import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; +import org.springframework.cloud.function.json.JsonMapper; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; -import org.springframework.context.SmartLifecycle; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.support.GenericApplicationContext; @@ -61,8 +61,8 @@ class RSocketAutoConfiguration { @Bean public FunctionToRSocketBinder functionToDestinationBinder(FunctionCatalog functionCatalog, - FunctionProperties functionProperties) { - return new FunctionToRSocketBinder(functionCatalog, functionProperties); + FunctionProperties functionProperties, JsonMapper jsonMapper) { + return new FunctionToRSocketBinder(functionCatalog, functionProperties, jsonMapper); } @Bean @@ -76,21 +76,22 @@ class RSocketAutoConfiguration { /** * */ - static class FunctionToRSocketBinder implements InitializingBean, ApplicationContextAware, SmartLifecycle { + static class FunctionToRSocketBinder implements InitializingBean, ApplicationContextAware { private final FunctionCatalog functionCatalog; private final FunctionProperties functionProperties; + private final JsonMapper jsonMapper; + private RSocketListenerFunction invocableFunction; private GenericApplicationContext context; - private boolean started; - - FunctionToRSocketBinder(FunctionCatalog functionCatalog, FunctionProperties functionProperties) { + FunctionToRSocketBinder(FunctionCatalog functionCatalog, FunctionProperties functionProperties, JsonMapper jsonMapper) { this.functionCatalog = functionCatalog; this.functionProperties = functionProperties; + this.jsonMapper = jsonMapper; } @Override @@ -110,7 +111,7 @@ class RSocketAutoConfiguration { throw new UnsupportedOperationException("Supplier is not currently supported for RSocket interaction"); } - this.invocableFunction = new RSocketListenerFunction(function); + this.invocableFunction = new RSocketListenerFunction(function, this.jsonMapper); } RSocket getRSocket() { @@ -152,26 +153,6 @@ class RSocketAutoConfiguration { public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.context = (GenericApplicationContext) applicationContext; } - - @Override - public void start() { - if (!this.isRunning() && this.invocableFunction != null) { - this.invocableFunction.start(); - } - } - - @Override - public void stop() { - if (this.isRunning() && this.invocableFunction != null) { - this.invocableFunction.stop(); - } - } - - @Override - public boolean isRunning() { - return this.started; - } - } } diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketForwardingFunction.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketForwardingFunction.java index 489d3253d..99dbb759c 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketForwardingFunction.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketForwardingFunction.java @@ -38,6 +38,16 @@ import org.springframework.messaging.support.MessageBuilder; /** + * + * An implementation of {@link Function} to support distributed function composition. + *
+ * This function wraps target function and forwards the result of + * the invocation of the target function to another RSocket returning the result of such forwarding as {@link Publisher}. + *

+ * A typical example is `spring.cloud.function.definition=uppercase>localhost:8888'. + *
+ * In this case 'uppercase' is targetFunction which will be invoked during the call to 'apply' and the result of + * this invocation sent to RSocket reachable at localhost:8888. * * @author Oleg Zhurakousky * @author Artem Bilan diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java index ae61989bc..1e0ab0770 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java @@ -18,6 +18,7 @@ package org.springframework.cloud.function.rsocket; import java.lang.reflect.Type; import java.nio.ByteBuffer; +import java.util.Map; import java.util.function.Function; import io.rsocket.Payload; @@ -26,14 +27,15 @@ import io.rsocket.util.DefaultPayload; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; -import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; +import org.springframework.cloud.function.json.JsonMapper; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.CollectionUtils; /** * Wrapper over an instance of target Function (represented by {@link FunctionInvocationWrapper}) @@ -56,11 +58,13 @@ class RSocketListenerFunction implements Function, Publisher, Publisher>) rawResult : Mono.just((Message) rawResult); } - void start() { - Type functionType = this.targetFunction.getFunctionType(); - - if (rsocket == null) { - rsocket = buildRSocket(this.targetFunction, functionType, this); - } - this.printSplashScreen(this.targetFunction.getFunctionDefinition(), functionType); - } - - void stop() { - if (this.rsocketConnection != null) { - this.rsocketConnection.dispose(); - } - } - public RSocket getRsocket() { if (this.rsocket == null) { - start(); + Type functionType = this.targetFunction.getFunctionType(); + + if (this.rsocket == null) { + this.rsocket = this.buildRSocket(this.targetFunction, functionType, this); + } + this.printSplashScreen(this.targetFunction.getFunctionDefinition(), functionType); } return this.rsocket; } @@ -112,7 +106,7 @@ class RSocketListenerFunction implements Function, Publisher inputMessage = deserealizePayload(payload); Mono> result = Mono.from(function.apply(inputMessage)); - return result.map(message -> DefaultPayload.create(message.getPayload())); + return result.map(message -> DefaultPayload.create(message.getPayload(), jsonMapper.toJson(message.getHeaders()))); } } @@ -167,15 +161,28 @@ class RSocketListenerFunction implements Function, Publisher deserealizePayload(Payload payload) { + @SuppressWarnings({ "rawtypes", "unchecked" }) + private Message deserealizePayload(Payload payload) { ByteBuffer buffer = payload.getData(); byte[] rawData = new byte[buffer.remaining()]; buffer.get(rawData); + Map headers = null; if (payload.hasMetadata()) { - String metadata = payload.getMetadataUtf8(); // TODO see what to do with it + try { + ByteBuffer metadata = payload.getMetadata(); + byte[] metadataBytes = new byte[metadata.remaining()]; + metadata.get(metadataBytes); + headers = this.jsonMapper.fromJson(metadataBytes, Map.class); + } + catch (Exception e) { + //throw new IllegalStateException(e); + logger.warn("Failed to extract headers from metadata", e); + } + } + MessageBuilder builder = MessageBuilder.withPayload(rawData); + if (!CollectionUtils.isEmpty(headers)) { + builder.copyHeaders(headers); } - MessageBuilder builder = MessageBuilder.withPayload(rawData); Message inputMessage = builder.build(); return inputMessage; diff --git a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java index 6f634df9c..be09b54ae 100644 --- a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java +++ b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java @@ -34,9 +34,9 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.messaging.rsocket.RSocketRequester; +import org.springframework.util.Assert; import org.springframework.util.SocketUtils; - /** * * @author Oleg Zhurakousky @@ -61,6 +61,28 @@ public class RSocketAutoConfigurationTests { .verify(); } + @Test + public void testImperativeFunctionAsRequestReplyWithMetadata() throws Exception { + int port = SocketUtils.findAvailableTcpPort(); + ApplicationContext context = new SpringApplicationBuilder(SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run( + "--logging.level.org.springframework.cloud.function=DEBUG", + "--spring.cloud.function.definition=uppercase", + "--spring.rsocket.server.port=" + port); + + RSocketRequester requester = context.getBean(RSocketRequester.class); + Mono result = requester.rsocket().requestResponse(DefaultPayload.create("\"hello\"", "{\"name\":\"bob\", \"age\":23}")) + .map(payload -> { + Assert.hasText(payload.getMetadataUtf8(), "Metadata must not be null"); + return payload.getDataUtf8(); + }); + + StepVerifier + .create(result) + .expectNext("\"HELLO\"") + .expectComplete() + .verify(); + } + @Test public void testImperativeFunctionAsRequestStream() throws Exception { int port = SocketUtils.findAvailableTcpPort();