diff --git a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java new file mode 100644 index 000000000..88e45c13f --- /dev/null +++ b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java @@ -0,0 +1,124 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.rsocket; + +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; +import reactor.test.StepVerifier; + +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.function.context.config.RoutingFunction; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.rsocket.RSocketRequester; +import org.springframework.util.MimeTypeUtils; +import org.springframework.util.SocketUtils; + +/** + * + * @author Oleg Zhurakousky + * @since 3.1 + */ +public class RSocketAutoConfigurationRoutingTests { + @Test + public void testImperativeFunctionAsRequestReplyWithDefinition() { + int port = SocketUtils.findAvailableTcpPort(); + try ( + ConfigurableApplicationContext applicationContext = + new SpringApplicationBuilder(SampleFunctionConfiguration.class) + .web(WebApplicationType.NONE) + .run("--logging.level.org.springframework.cloud.function=DEBUG", + "--spring.cloud.function.routing-expression=headers.func_name", + "--spring.cloud.function.expected-content-type=text/plain", + "--spring.rsocket.server.port=" + port); + ) { + RSocketRequester.Builder rsocketRequesterBuilder = + applicationContext.getBean(RSocketRequester.Builder.class); + + rsocketRequesterBuilder.tcp("localhost", port) + .route(RoutingFunction.FUNCTION_NAME) + .metadata("{\"func_name\":\"echo\"}", MimeTypeUtils.APPLICATION_JSON) + .data("hello") + .retrieveMono(String.class) + .as(StepVerifier::create) + .expectNext("hello") + .expectComplete() + .verify(); + + rsocketRequesterBuilder.tcp("localhost", port) + .route(RoutingFunction.FUNCTION_NAME) + .metadata("{\"func_name\":\"uppercase\"}", MimeTypeUtils.APPLICATION_JSON) + .data("hello") + .retrieveMono(String.class) + .as(StepVerifier::create) + .expectNext("HELLO") + .expectComplete() + .verify(); + } + } + + + @EnableAutoConfiguration + @Configuration + public static class SampleFunctionConfiguration { + + final Sinks.One consumerData = Sinks.one(); + + @Bean + public Function uppercase() { + return v -> v.toUpperCase(); + } + + @Bean + public Function concat() { + return v -> v + v; + } + + @Bean + public Function echo() { + return v -> v; + } + + @Bean + public Function, Flux> uppercaseReactive() { + return flux -> flux.map(v -> { + System.out.println("Uppercasing: " + v); + return v.toUpperCase(); + }); + } + + @Bean + public Consumer log() { + return this.consumerData::tryEmitValue; + } + + @Bean + public Supplier source() { + return () -> "test data"; + } + + } + +}