diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java index dbe7d5623..e8bf0d38c 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java @@ -33,6 +33,7 @@ import org.reactivestreams.Publisher; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.function.context.catalog.FunctionInspector; import org.springframework.cloud.function.context.message.MessageUtils; +import org.springframework.cloud.function.core.FluxWrapper; import org.springframework.cloud.function.json.JsonMapper; import org.springframework.cloud.function.web.util.HeaderUtils; import org.springframework.http.HttpHeaders; @@ -64,18 +65,22 @@ public class RequestProcessor { @Value("${debug:${DEBUG:false}}") private String debug = "false"; - public RequestProcessor(JsonMapper mapper, FunctionInspector inspector, + public RequestProcessor(FunctionInspector inspector, JsonMapper mapper, StringConverter converter) { this.mapper = mapper; this.inspector = inspector; this.converter = converter; } - public static FunctionWrapper wrapper(Function, Publisher> function, - Consumer> consumer, Supplier> supplier) { + public static FunctionWrapper wrapper(Function, ? extends Publisher> function, + Consumer> consumer, Supplier> supplier) { return new FunctionWrapper(function, consumer, supplier); } + public static FunctionWrapper wrapper(Function, ? extends Publisher> function) { + return new FunctionWrapper(function, null, null); + } + public Mono> get(FunctionWrapper wrapper) { if (wrapper.function() != null) { return response(wrapper, wrapper.function(), @@ -169,8 +174,7 @@ public class RequestProcessor { } private void addHeaders(BodyBuilder builder, Message message) { - HttpHeaders headers = new HttpHeaders(); - builder.headers(HeaderUtils.fromMessage(message.getHeaders(), headers)); + builder.headers(HeaderUtils.fromMessage(message.getHeaders())); } private Mono> stream(FunctionWrapper request, Publisher result) { @@ -221,6 +225,9 @@ public class RequestProcessor { } private boolean isOutputSingle(Object handler) { + if (handler instanceof FluxWrapper) { + handler = ((FluxWrapper) handler).getTarget(); + } Class type = inspector.getOutputType(handler); Class wrapper = inspector.getOutputWrapper(handler); if (Stream.class.isAssignableFrom(type)) { @@ -232,9 +239,9 @@ public class RequestProcessor { } } - private Mono value(Function, Publisher> function, String value) { - Object input = converter.convert(function, value); - return Mono.from(function.apply(Flux.just(input))); + private Publisher value(Function, Publisher> function, Publisher value) { + Flux input = Flux.from(value).map(body -> converter.convert(function, body)); + return Mono.from(function.apply(input)); } public static class FunctionWrapper { @@ -249,13 +256,14 @@ public class RequestProcessor { private HttpHeaders headers = new HttpHeaders(); - private String argument; + private Publisher argument; - public FunctionWrapper(Function, Publisher> function, - Consumer> consumer, Supplier> supplier) { - this.function = function; - this.consumer = consumer; - this.supplier = supplier; + @SuppressWarnings("unchecked") + public FunctionWrapper(Function, ? extends Publisher> function, + Consumer> consumer, Supplier> supplier) { + this.function = (Function, Publisher>) function; + this.consumer = (Consumer>) consumer; + this.supplier = (Supplier>) supplier; } public Object handler() { @@ -292,12 +300,17 @@ public class RequestProcessor { return this; } - public FunctionWrapper argument(String argument) { + public FunctionWrapper argument(Publisher argument) { this.argument = argument; return this; } - public String argument() { + public FunctionWrapper argument(String argument) { + this.argument = Mono.just(argument); + return this; + } + + public Publisher argument() { return this.argument; } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java index 79d7e6086..557988eb6 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java @@ -101,7 +101,9 @@ public class FunctionController { wrapper.headers(request.getRequest().getHeaders()); wrapper.params(request.getRequest().getQueryParams()); String argument = (String) request.getAttribute(WebRequestConstants.ARGUMENT); - wrapper.argument(argument); + if (argument != null) { + wrapper.argument(argument); + } return wrapper; } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java index 6bd64e726..2324248fa 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java @@ -31,6 +31,11 @@ import org.springframework.boot.web.reactive.error.DefaultErrorAttributes; import org.springframework.boot.web.reactive.error.ErrorAttributes; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.catalog.FunctionInspector; +import org.springframework.cloud.function.json.JsonMapper; +import org.springframework.cloud.function.web.BasicStringConverter; +import org.springframework.cloud.function.web.RequestProcessor; +import org.springframework.cloud.function.web.RequestProcessor.FunctionWrapper; +import org.springframework.cloud.function.web.StringConverter; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextInitializer; import org.springframework.context.ApplicationEvent; @@ -38,6 +43,7 @@ import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.SmartApplicationListener; import org.springframework.context.support.GenericApplicationContext; import org.springframework.core.env.Environment; +import org.springframework.http.ResponseEntity; import org.springframework.http.codec.ServerCodecConfigurer; import org.springframework.http.server.reactive.HttpHandler; import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter; @@ -51,7 +57,7 @@ import org.springframework.web.server.adapter.WebHttpHandlerBuilder; import static org.springframework.web.reactive.function.server.RequestPredicates.POST; import static org.springframework.web.reactive.function.server.RouterFunctions.route; -import static org.springframework.web.reactive.function.server.ServerResponse.ok; +import static org.springframework.web.reactive.function.server.ServerResponse.status; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -70,7 +76,9 @@ class FunctionEndpointInitializer @Override public void initialize(GenericApplicationContext context) { if (context.getEnvironment().getProperty("spring.functional.enabled", - Boolean.class, false) && ClassUtils.isPresent("org.springframework.http.server.reactive.HttpHandler", null)) { + Boolean.class, false) + && ClassUtils.isPresent( + "org.springframework.http.server.reactive.HttpHandler", null)) { registerEndpoint(context); registerWebFluxAutoConfiguration(context); } @@ -85,9 +93,17 @@ class FunctionEndpointInitializer } private void registerEndpoint(GenericApplicationContext context) { + context.registerBean(StringConverter.class, + () -> new BasicStringConverter(context.getBean(FunctionInspector.class), + context.getBeanFactory())); + context.registerBean(RequestProcessor.class, + () -> new RequestProcessor(context.getBean(FunctionInspector.class), + context.getBean(JsonMapper.class), + context.getBean(StringConverter.class))); context.registerBean(FunctionEndpointFactory.class, () -> new FunctionEndpointFactory(context.getBean(FunctionCatalog.class), context.getBean(FunctionInspector.class), + context.getBean(RequestProcessor.class), context.getEnvironment())); context.registerBean(RouterFunction.class, () -> context.getBean(FunctionEndpointFactory.class).functionEndpoints()); @@ -183,12 +199,15 @@ class FunctionEndpointFactory { private FunctionInspector inspector; + private RequestProcessor processor; + public FunctionEndpointFactory(FunctionCatalog catalog, FunctionInspector inspector, - Environment environment) { + RequestProcessor processor, Environment environment) { String handler = environment.resolvePlaceholders("${function.handler}"); if (handler.startsWith("$")) { handler = null; } + this.processor = processor; this.inspector = inspector; this.function = extract(catalog, handler); } @@ -212,12 +231,13 @@ class FunctionEndpointFactory { @SuppressWarnings({ "unchecked" }) public RouterFunction functionEndpoints() { return route(POST("/"), request -> { - Class inputType = this.inspector.getInputType(this.function); Class outputType = (Class) this.inspector.getOutputType(this.function); - return ok().body( - Mono.from( - (Flux) this.function.apply(request.bodyToFlux(inputType))), - outputType); + FunctionWrapper wrapper = RequestProcessor.wrapper(function, null, null); + Mono> stream = request.bodyToMono(String.class) + .flatMap(content -> processor.post(wrapper, content, false)); + return stream.flatMap(entity -> status(entity.getStatusCode()) + .headers(headers -> headers.addAll(entity.getHeaders())) + .body(Mono.just((T) entity.getBody()), outputType)); }); } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionController.java index b6a63f30d..632ea5fba 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionController.java @@ -110,7 +110,9 @@ public class FunctionController { } String argument = (String) request.getAttribute(WebRequestConstants.ARGUMENT, WebRequest.SCOPE_REQUEST); - wrapper.argument(argument); + if (argument != null) { + wrapper.argument(argument); + } return wrapper; } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/HeaderUtils.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/HeaderUtils.java index d5cf9d0ec..863c142f5 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/HeaderUtils.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/HeaderUtils.java @@ -44,7 +44,7 @@ public class HeaderUtils { REQUEST_ONLY.add(HttpHeaders.HOST, ""); } - public static HttpHeaders fromMessage(MessageHeaders headers, HttpHeaders request) { + public static HttpHeaders fromMessage(MessageHeaders headers) { HttpHeaders result = new HttpHeaders(); for (String name : headers.keySet()) { Object value = headers.get(name); diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/HeadersToMessageTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/HeadersToMessageTests.java new file mode 100644 index 000000000..c13cf26dd --- /dev/null +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/HeadersToMessageTests.java @@ -0,0 +1,65 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.function.test; + +import java.util.function.Function; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; +import org.springframework.cloud.function.context.test.FunctionalSpringBootTest; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.web.reactive.server.WebTestClient; + +import reactor.core.publisher.Mono; + +/** + * + * @author Oleg Zhurakousky + * + */ +@RunWith(SpringRunner.class) +@FunctionalSpringBootTest +@AutoConfigureWebTestClient +public class HeadersToMessageTests { + + @Autowired + private WebTestClient client; + + @Test + public void testBodyAndCustomHeaderFromMessagePropagation() throws Exception { + client.post().uri("/").body(Mono.just("foo"), String.class).exchange() + .expectStatus().isOk().expectHeader() + .valueEquals("x-content-type", "application/xml").expectHeader() + .valueEquals("foo", "bar").expectBody(String.class).isEqualTo("FOO"); + } + + @SpringBootConfiguration + protected static class TestConfiguration + implements Function, Message> { + public Message apply(Message request) { + Message message = MessageBuilder.withPayload(request.getPayload().toUpperCase()) + .setHeader("X-Content-Type", "application/xml") + .setHeader("foo", "bar").build(); + return message; + } + } +} diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/PojoTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/PojoTests.java new file mode 100644 index 000000000..8389f6b6e --- /dev/null +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/PojoTests.java @@ -0,0 +1,84 @@ +/* + * Copyright 2012-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.test; + +import java.util.function.Function; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; +import org.springframework.cloud.function.context.test.FunctionalSpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.web.reactive.server.WebTestClient; + +import reactor.core.publisher.Mono; + +/** + * @author Dave Syer + * + */ +@RunWith(SpringRunner.class) +@FunctionalSpringBootTest +@AutoConfigureWebTestClient +public class PojoTests { + + @Autowired + private WebTestClient client; + + @Test + public void words() throws Exception { + client.post().uri("/").body(Mono.just("{\"value\":\"foo\"}"), String.class).exchange() + .expectStatus().isOk().expectBody(String.class).isEqualTo("{\"value\":\"FOO\"}"); + } + + @SpringBootConfiguration + protected static class TestConfiguration implements Function { + @Override + public Foo apply(Foo value) { + return new Foo(value.getValue().toUpperCase()); + } + } +} + +class Foo { + + private String value; + + public Foo() { + } + + public Foo(String value) { + this.value = value; + } + + public String getValue() { + return this.value; + } + + public void setValue(String value) { + this.value = value; + } + + @Override + public String toString() { + return "Foo [value=" + this.value + "]"; + } + +} \ No newline at end of file