diff --git a/pom.xml b/pom.xml index 5eefd283b..57946be87 100644 --- a/pom.xml +++ b/pom.xml @@ -64,6 +64,7 @@ spring-cloud-function-task spring-cloud-function-web spring-cloud-starter-function-web + spring-cloud-starter-function-webflux spring-cloud-function-samples spring-cloud-function-deployer spring-cloud-function-adapters diff --git a/spring-cloud-function-web/pom.xml b/spring-cloud-function-web/pom.xml index 290000d28..545af070f 100644 --- a/spring-cloud-function-web/pom.xml +++ b/spring-cloud-function-web/pom.xml @@ -18,6 +18,12 @@ org.springframework spring-webmvc + true + + + org.springframework + spring-webflux + true org.springframework.cloud diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java new file mode 100644 index 000000000..dff610b90 --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java @@ -0,0 +1,317 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.function.context.catalog.FunctionInspector; +import org.springframework.cloud.function.context.message.MessageUtils; +import org.springframework.cloud.function.json.JsonMapper; +import org.springframework.cloud.function.web.util.HeaderUtils; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.http.ResponseEntity.BodyBuilder; +import org.springframework.messaging.Message; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.util.StringUtils; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * @author Dave Syer + * + */ +public class RequestProcessor { + + private static Log logger = LogFactory.getLog(RequestProcessor.class); + + private FunctionInspector inspector; + + private StringConverter converter; + + private JsonMapper mapper; + + @Value("${debug:${DEBUG:false}}") + private String debug = "false"; + + public RequestProcessor(JsonMapper mapper, FunctionInspector inspector, + StringConverter converter) { + this.mapper = mapper; + this.inspector = inspector; + this.converter = converter; + } + + public static FunctionWrapper wrapper(Function, Publisher> function, + Consumer> consumer, Supplier> supplier) { + return new FunctionWrapper(function, consumer, supplier); + } + + public static class FunctionWrapper { + + private Function, Publisher> function; + + private Consumer> consumer; + + private Supplier> supplier; + + private MultiValueMap params = new LinkedMultiValueMap<>(); + + private HttpHeaders headers = new HttpHeaders(); + + private String argument; + + public FunctionWrapper(Function, Publisher> function, + Consumer> consumer, Supplier> supplier) { + this.function = function; + this.consumer = consumer; + this.supplier = supplier; + } + + public Object handler() { + return function != null ? function : consumer != null ? consumer : supplier; + } + + public Function, Publisher> function() { + return this.function; + } + + public Consumer> consumer() { + return this.consumer; + } + + public Supplier> supplier() { + return this.supplier; + } + + public MultiValueMap params() { + return params; + } + + public HttpHeaders headers() { + return this.headers; + } + + public FunctionWrapper headers(HttpHeaders headers) { + this.headers = headers; + return this; + } + + public FunctionWrapper params(MultiValueMap params) { + this.params.addAll(params); + return this; + } + + public FunctionWrapper argument(String argument) { + this.argument = argument; + return this; + } + + public String argument() { + return this.argument; + } + } + + public Mono> post(FunctionWrapper wrapper, String body, + boolean stream) { + + Object function = wrapper.handler(); + if (!StringUtils.hasText(body)) { + return post(wrapper, (List) null, null, stream); + } + body = body.trim(); + Object input; + Class inputType = inspector.getInputType(function); + if (body.startsWith("[")) { + input = mapper.toList(body, inputType); + } + else { + if (inputType == String.class) { + input = body; + } + else if (body.startsWith("{")) { + input = mapper.toSingle(body, inputType); + } + else if (body.startsWith("\"")) { + input = body.substring(1, body.length() - 2); + } + else { + input = converter.convert(function, body); + } + } + if (input instanceof List) { + return post(wrapper, (List) input, null, stream); + } + return post(wrapper, Collections.singletonList(input), null, stream); + } + + private Mono> post(FunctionWrapper wrapper, List body, + MultiValueMap params, boolean stream) { + + Function, Publisher> function = wrapper.function(); + Consumer> consumer = wrapper.consumer(); + + MultiValueMap form = wrapper.params(); + if (params != null) { + form.putAll(params); + } + + Flux flux = body == null ? Flux.just(form) : Flux.fromIterable(body); + if (inspector.isMessage(function)) { + flux = messages(wrapper, function == null ? consumer : function, flux); + } + if (function != null) { + Flux result = Flux.from(function.apply(flux)); + if (logger.isDebugEnabled()) { + logger.debug("Handled POST with function"); + } + if (stream) { + return stream(wrapper, result); + } + return response(function, result, body == null ? null : body.size()<=1, false); + } + + if (consumer != null) { + consumer.accept(flux); + if (logger.isDebugEnabled()) { + logger.debug("Handled POST with consumer"); + } + return Mono.just(ResponseEntity.status(HttpStatus.ACCEPTED).build()); + } + + throw new IllegalArgumentException("no such function"); + } + + private Flux messages(FunctionWrapper request, Object function, Flux flux) { + Map headers = HeaderUtils.fromHttp(request.headers()); + flux = flux.map(payload -> MessageUtils.create(function, payload, headers)); + return flux; + } + + private void addHeaders(BodyBuilder builder, Message message) { + HttpHeaders headers = new HttpHeaders(); + builder.headers(HeaderUtils.fromMessage(message.getHeaders(), headers)); + } + + public Mono> stream(FunctionWrapper request) { + Publisher result; + if (request.function()!=null) { + result = value(request.function(), request.argument()); + } else { + result = supplier(request.supplier()); + } + return stream(request, result); + } + + + private Mono> stream(FunctionWrapper request, Publisher result) { + + BodyBuilder builder = ResponseEntity.ok(); + if (inspector.isMessage(request.handler())) { + result = Flux.from(result) + .doOnNext(value -> addHeaders(builder, (Message) value)) + .map(message -> MessageUtils.unpack(request.handler(), message) + .getPayload()); + } + + Publisher output = result; + return Flux.from(output).then(Mono.fromSupplier(() -> builder.body(output))); + + } + + private Mono> response(Object handler, Publisher result, + Boolean single, boolean getter) { + + BodyBuilder builder = ResponseEntity.ok(); + if (inspector.isMessage(handler)) { + result = Flux.from(result) + .doOnNext(value -> addHeaders(builder, (Message) value)) + .map(message -> MessageUtils.unpack(handler, message).getPayload()); + } + + if (single != null && single && isOutputSingle(handler)) { + result = Mono.from(result); + } + else if (getter && single == null && isOutputSingle(handler)) { + result = Mono.from(result); + } + else if (isInputMultiple(handler) && isOutputSingle(handler)) { + result = Mono.from(result); + } + Publisher output = result; + if (output instanceof Mono) { + return Mono.from(output).flatMap(body -> Mono.just(builder.body(body))); + } + return Flux.from(output).collectList() + .flatMap(body -> Mono.just(builder.body(body))); + } + + private boolean isInputMultiple(Object handler) { + Class type = inspector.getInputType(handler); + Class wrapper = inspector.getInputWrapper(handler); + return Collection.class.isAssignableFrom(type) || Flux.class.equals(wrapper); + } + + private boolean isOutputSingle(Object handler) { + Class type = inspector.getOutputType(handler); + Class wrapper = inspector.getOutputWrapper(handler); + if (Stream.class.isAssignableFrom(type)) { + return false; + } + if (wrapper == type) { + return true; + } + return Mono.class.equals(wrapper) || Optional.class.equals(wrapper); + } + + private Publisher supplier(Supplier> supplier) { + Publisher result = supplier.get(); + return result; + } + + private Mono value(Function, Publisher> function, String value) { + Object input = converter.convert(function, value); + Mono result = Mono.from(function.apply(Flux.just(input))); + return result; + } + + public Mono> get(FunctionWrapper wrapper) { + if (wrapper.function() != null) { + return response(wrapper.function(), value(wrapper.function(), wrapper.argument()), true, true); + } + else { + return response(wrapper.supplier(), supplier(wrapper.supplier()), null, true); + } + } + + +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestApplication.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestApplication.java index f59f2afe5..c1e25cdb9 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestApplication.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestApplication.java @@ -17,12 +17,14 @@ package org.springframework.cloud.function.web; import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; /** * @author Mark Fisher */ -@SpringBootApplication +@SpringBootConfiguration +@EnableAutoConfiguration public class RestApplication { public static void main(String[] args) { diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/StringConverter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/StringConverter.java similarity index 93% rename from spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/StringConverter.java rename to spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/StringConverter.java index c3ad9d96c..092353c3a 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/StringConverter.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/StringConverter.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.springframework.cloud.function.web.flux; +package org.springframework.cloud.function.web; /** * @author Dave Syer diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/constants/WebRequestConstants.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/constants/WebRequestConstants.java similarity index 82% rename from spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/constants/WebRequestConstants.java rename to spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/constants/WebRequestConstants.java index 4d636133a..ed111dfb5 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/constants/WebRequestConstants.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/constants/WebRequestConstants.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.cloud.function.web.flux.constants; +package org.springframework.cloud.function.web.constants; /** * Common storage for web request attribute names (in a separate package to avoid cycles). @@ -33,9 +33,5 @@ public abstract class WebRequestConstants { + ".argument"; public static final String HANDLER = WebRequestConstants.class.getName() + ".handler"; - public static final String INPUT_SINGLE = WebRequestConstants.class.getName() - + ".input_single"; - public static final String OUTPUT_SINGLE = WebRequestConstants.class.getName() - + ".output_single"; } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java index 6892f0e55..79d7e6086 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java @@ -16,32 +16,24 @@ package org.springframework.cloud.function.web.flux; -import java.util.Collection; -import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.Stream; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; -import org.springframework.cloud.function.context.catalog.FunctionInspector; -import org.springframework.cloud.function.context.message.MessageUtils; -import org.springframework.cloud.function.web.flux.constants.WebRequestConstants; -import org.springframework.cloud.function.web.flux.request.FluxFormRequest; -import org.springframework.cloud.function.web.flux.request.FluxRequest; -import org.springframework.http.HttpStatus; +import org.springframework.cloud.function.web.RequestProcessor; +import org.springframework.cloud.function.web.RequestProcessor.FunctionWrapper; +import org.springframework.cloud.function.web.constants.WebRequestConstants; +import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.ResponseBody; -import org.springframework.web.context.request.WebRequest; +import org.springframework.web.server.ServerWebExchange; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -51,149 +43,65 @@ import reactor.core.publisher.Mono; @Component public class FunctionController { - private static Log logger = LogFactory.getLog(FunctionController.class); + private RequestProcessor processor; - private FunctionInspector inspector; - - private boolean debug = false; - - private StringConverter converter; - - public FunctionController(FunctionInspector inspector, StringConverter converter) { - this.inspector = inspector; - this.converter = converter; + public FunctionController(RequestProcessor processor) { + this.processor = processor; } - public void setDebug(boolean debug) { - this.debug = debug; + @PostMapping(path = "/**", consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE) + @ResponseBody + public Mono> form(ServerWebExchange request) { + FunctionWrapper wrapper = wrapper(request); + return request.getFormData().doOnSuccess(params -> wrapper.params(params)) + .then(processor.post(wrapper, null, false)); } @PostMapping(path = "/**") @ResponseBody - public ResponseEntity> post(WebRequest request, - @RequestBody FluxRequest body) { - - @SuppressWarnings("unchecked") - Function, Publisher> function = (Function, Publisher>) request - .getAttribute(WebRequestConstants.FUNCTION, WebRequest.SCOPE_REQUEST); - @SuppressWarnings("unchecked") - Consumer> consumer = (Consumer>) request - .getAttribute(WebRequestConstants.CONSUMER, WebRequest.SCOPE_REQUEST); - Boolean single = (Boolean) request.getAttribute(WebRequestConstants.INPUT_SINGLE, - WebRequest.SCOPE_REQUEST); - - FluxFormRequest form = FluxFormRequest.from(request.getParameterMap()); - - if (function != null) { - Flux flux = body.body() == null ? form.flux() : body.flux(); - if (debug) { - flux = flux.log(); - } - Flux result = Flux.from(function.apply(flux)); - if (inspector.isMessage(function)) { - result = result.map(message -> MessageUtils.unpack(function, message)); - } - if (logger.isDebugEnabled()) { - logger.debug("Handled POST with function"); - } - return ResponseEntity.ok().body(response(request, function, single, result)); - } - - if (consumer != null) { - Flux flux = body.body() == null ? form.flux().cache() - : body.flux().cache(); // send a copy back to the caller - if (debug) { - flux = flux.log(); - } - consumer.accept(flux); - if (logger.isDebugEnabled()) { - logger.debug("Handled POST with consumer"); - } - return ResponseEntity.status(HttpStatus.ACCEPTED).body(flux); - } - - throw new IllegalArgumentException("no such function"); + public Mono> post(ServerWebExchange request, + @RequestBody(required = false) String body) { + FunctionWrapper wrapper = wrapper(request); + return processor.post(wrapper, body, false); } - private Publisher response(WebRequest request, Object handler, Boolean single, - Publisher result) { - - if (single != null && single && isOutputSingle(handler)) { - request.setAttribute(WebRequestConstants.OUTPUT_SINGLE, true, - WebRequest.SCOPE_REQUEST); - return Mono.from(result); - } - - if (isInputMultiple(handler) && isOutputSingle(handler)) { - request.setAttribute(WebRequestConstants.OUTPUT_SINGLE, true, - WebRequest.SCOPE_REQUEST); - return Mono.from(result); - } - - request.setAttribute(WebRequestConstants.OUTPUT_SINGLE, false, - WebRequest.SCOPE_REQUEST); - - return result; - } - - private boolean isInputMultiple(Object handler) { - Class type = inspector.getInputType(handler); - Class wrapper = inspector.getInputWrapper(handler); - return Collection.class.isAssignableFrom(type) || Flux.class.equals(wrapper); - } - - private boolean isOutputSingle(Object handler) { - Class type = inspector.getOutputType(handler); - Class wrapper = inspector.getOutputWrapper(handler); - if (Stream.class.isAssignableFrom(type)) { - return false; - } - if (wrapper == type) { - return true; - } - return Mono.class.equals(wrapper) || Optional.class.equals(wrapper); + @PostMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @ResponseBody + public Mono> postStream(ServerWebExchange request, + @RequestBody(required = false) String body) { + FunctionWrapper wrapper = wrapper(request); + return processor.post(wrapper, body, true); } @GetMapping(path = "/**") @ResponseBody - public ResponseEntity> get(WebRequest request) { + public Mono> get(ServerWebExchange request) { + FunctionWrapper wrapper = wrapper(request); + return processor.get(wrapper); + } + + @GetMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @ResponseBody + public Mono> getStream(ServerWebExchange request) { + FunctionWrapper wrapper = wrapper(request); + return processor.stream(wrapper); + } + + private FunctionWrapper wrapper(ServerWebExchange request) { @SuppressWarnings("unchecked") Function, Publisher> function = (Function, Publisher>) request - .getAttribute(WebRequestConstants.FUNCTION, WebRequest.SCOPE_REQUEST); + .getAttribute(WebRequestConstants.FUNCTION); + @SuppressWarnings("unchecked") + Consumer> consumer = (Consumer>) request + .getAttribute(WebRequestConstants.CONSUMER); @SuppressWarnings("unchecked") Supplier> supplier = (Supplier>) request - .getAttribute(WebRequestConstants.SUPPLIER, WebRequest.SCOPE_REQUEST); - String argument = (String) request.getAttribute(WebRequestConstants.ARGUMENT, - WebRequest.SCOPE_REQUEST); - - Publisher result; - if (function != null) { - result = value(function, argument); - } - else { - result = response(request, supplier, true, supplier(supplier)); - } - if (inspector.isMessage(function)) { - result = Flux.from(result) - .map(message -> MessageUtils.unpack(function, message)); - } - return ResponseEntity.ok().body(result); - } - - private Publisher supplier(Supplier> supplier) { - Publisher result = supplier.get(); - if (logger.isDebugEnabled()) { - logger.debug("Handled GET with supplier"); - } - return debug ? Flux.from(result).log() : result; - } - - private Mono value(Function, Publisher> function, String value) { - Object input = converter.convert(function, value); - Mono result = Mono.from(function.apply(Flux.just(input))); - if (logger.isDebugEnabled()) { - logger.debug("Handled GET with function"); - } - return debug ? result.log() : result; + .getAttribute(WebRequestConstants.SUPPLIER); + FunctionWrapper wrapper = RequestProcessor.wrapper(function, consumer, supplier); + wrapper.headers(request.getRequest().getHeaders()); + wrapper.params(request.getRequest().getQueryParams()); + String argument = (String) request.getAttribute(WebRequestConstants.ARGUMENT); + wrapper.argument(argument); + return wrapper; } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionHandlerMapping.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionHandlerMapping.java index d2bf970f7..94033a428 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionHandlerMapping.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionHandlerMapping.java @@ -20,8 +20,6 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; -import javax.servlet.http.HttpServletRequest; - import org.reactivestreams.Publisher; import org.springframework.beans.factory.InitializingBean; @@ -29,12 +27,15 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.cloud.function.context.FunctionCatalog; -import org.springframework.cloud.function.web.flux.constants.WebRequestConstants; +import org.springframework.cloud.function.web.constants.WebRequestConstants; import org.springframework.context.annotation.Configuration; +import org.springframework.http.HttpMethod; import org.springframework.util.StringUtils; import org.springframework.web.method.HandlerMethod; -import org.springframework.web.servlet.HandlerMapping; -import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping; +import org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping; +import org.springframework.web.server.ServerWebExchange; + +import reactor.core.publisher.Mono; /** * @author Dave Syer @@ -52,9 +53,6 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping @Value("${spring.cloud.function.web.path:}") private String prefix = ""; - @Value("${debug:${DEBUG:false}}") - private String debug = "false"; - @Autowired public FunctionHandlerMapping(FunctionCatalog catalog, FunctionController controller) { @@ -67,7 +65,7 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping @Override public void afterPropertiesSet() { super.afterPropertiesSet(); - this.controller.setDebug(!"false".equals(debug)); + // this.controller.setDebug(!"false".equals(debug)); detectHandlerMethods(controller); while (prefix.endsWith("/")) { prefix = prefix.substring(0, prefix.length() - 1); @@ -79,68 +77,58 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping } @Override - protected HandlerMethod getHandlerInternal(HttpServletRequest request) - throws Exception { - HandlerMethod handler = super.getHandlerInternal(request); - if (handler == null) { - return null; - } - String path = (String) request - .getAttribute(HandlerMapping.PATH_WITHIN_HANDLER_MAPPING_ATTRIBUTE); + public Mono getHandlerInternal(ServerWebExchange request) { + String path = request.getRequest().getPath().pathWithinApplication().value(); if (StringUtils.hasText(prefix) && !path.startsWith(prefix)) { - return null; + return Mono.empty(); + } + Mono handler = super.getHandlerInternal(request); + if (path == null) { + return handler; } if (path.startsWith(prefix)) { path = path.substring(prefix.length()); } - if (path == null) { - return handler; - } Object function = findFunctionForGet(request, path); - if (function != null) { - if (logger.isDebugEnabled()) { - logger.debug("Found function for GET: " + path); - } - request.setAttribute(WebRequestConstants.HANDLER, function); - return handler; + if (function == null) { + function = findFunctionForPost(request, path); } - function = findFunctionForPost(request, path); if (function != null) { if (logger.isDebugEnabled()) { logger.debug("Found function for POST: " + path); } - request.setAttribute(WebRequestConstants.HANDLER, function); - return handler; + request.getAttributes().put(WebRequestConstants.HANDLER, function); } - return null; + Object actual = function; + return handler.filter(method -> actual != null); } - private Object findFunctionForPost(HttpServletRequest request, String path) { - if (!request.getMethod().equals("POST")) { + private Object findFunctionForPost(ServerWebExchange request, String path) { + if (!request.getRequest().getMethod().equals(HttpMethod.POST)) { return null; } path = path.startsWith("/") ? path.substring(1) : path; Consumer> consumer = functions.lookup(Consumer.class, path); if (consumer != null) { - request.setAttribute(WebRequestConstants.CONSUMER, consumer); + request.getAttributes().put(WebRequestConstants.CONSUMER, consumer); return consumer; } Function function = functions.lookup(Function.class, path); if (function != null) { - request.setAttribute(WebRequestConstants.FUNCTION, function); + request.getAttributes().put(WebRequestConstants.FUNCTION, function); return function; } return null; } - private Object findFunctionForGet(HttpServletRequest request, String path) { - if (!request.getMethod().equals("GET")) { + private Object findFunctionForGet(ServerWebExchange request, String path) { + if (!request.getRequest().getMethod().equals(HttpMethod.GET)) { return null; } path = path.startsWith("/") ? path.substring(1) : path; Supplier> supplier = functions.lookup(Supplier.class, path); if (supplier != null) { - request.setAttribute(WebRequestConstants.SUPPLIER, supplier); + request.getAttributes().put(WebRequestConstants.SUPPLIER, supplier); return supplier; } StringBuilder builder = new StringBuilder(); @@ -156,8 +144,8 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping : null; Function function = functions.lookup(Function.class, name); if (function != null) { - request.setAttribute(WebRequestConstants.FUNCTION, function); - request.setAttribute(WebRequestConstants.ARGUMENT, value); + request.getAttributes().put(WebRequestConstants.FUNCTION, function); + request.getAttributes().put(WebRequestConstants.ARGUMENT, value); return function; } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ReactorAutoConfiguration.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ReactorAutoConfiguration.java index 0a9a913a2..5998d9008 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ReactorAutoConfiguration.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ReactorAutoConfiguration.java @@ -16,34 +16,24 @@ package org.springframework.cloud.function.web.flux; -import java.util.ArrayList; -import java.util.List; - -import org.springframework.beans.factory.SmartInitializingSingleton; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication; +import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication.Type; import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration; -import org.springframework.boot.autoconfigure.http.HttpMessageConverters; import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.catalog.FunctionInspector; -import org.springframework.cloud.function.json.JsonMapper; -import org.springframework.cloud.function.web.flux.request.FluxHandlerMethodArgumentResolver; -import org.springframework.cloud.function.web.flux.response.FluxReturnValueHandler; -import org.springframework.context.ApplicationContext; +import org.springframework.cloud.function.web.RequestProcessor; +import org.springframework.cloud.function.web.StringConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.core.convert.ConversionService; import org.springframework.core.convert.support.DefaultConversionService; import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler; -import org.springframework.web.method.support.HandlerMethodArgumentResolver; -import org.springframework.web.method.support.HandlerMethodReturnValueHandler; -import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter; import reactor.core.publisher.Flux; @@ -53,15 +43,12 @@ import reactor.core.publisher.Flux; * @author Oleg Zhurakousky */ @Configuration -@ConditionalOnWebApplication @ConditionalOnClass({ Flux.class, AsyncHandlerMethodReturnValueHandler.class }) -@Import(FunctionController.class) +@ConditionalOnWebApplication(type=Type.REACTIVE) +@Import({FunctionController.class, RequestProcessor.class}) @AutoConfigureAfter({ JacksonAutoConfiguration.class, GsonAutoConfiguration.class }) public class ReactorAutoConfiguration { - @Autowired - private ApplicationContext context; - @Bean public FunctionHandlerMapping functionHandlerMapping(FunctionCatalog catalog, FunctionController controller) { @@ -75,44 +62,6 @@ public class ReactorAutoConfiguration { return new BasicStringConverter(inspector, beanFactory); } - @Configuration - protected static class FluxReturnValueConfiguration { - @Bean - public FluxReturnValueHandler fluxReturnValueHandler(FunctionInspector inspector, - HttpMessageConverters converters) { - return new FluxReturnValueHandler(inspector, converters.getConverters()); - } - } - - @Configuration - protected static class FluxArgumentResolverConfiguration { - @Bean - public FluxHandlerMethodArgumentResolver fluxHandlerMethodArgumentResolver( - FunctionInspector inspector, JsonMapper mapper) { - return new FluxHandlerMethodArgumentResolver(inspector, mapper); - } - } - - @Bean - public SmartInitializingSingleton fluxRequestMappingHandlerAdapterProcessor( - RequestMappingHandlerAdapter adapter, - FluxHandlerMethodArgumentResolver resolver) { - return new SmartInitializingSingleton() { - - @Override - public void afterSingletonsInstantiated() { - List resolvers = new ArrayList<>( - adapter.getArgumentResolvers()); - resolvers.add(0, resolver); - adapter.setArgumentResolvers(resolvers); - List handlers = new ArrayList<>( - adapter.getReturnValueHandlers()); - handlers.add(0, context.getBean(FluxReturnValueHandler.class)); - adapter.setReturnValueHandlers(handlers); - } - - }; - } private static class BasicStringConverter implements StringConverter { diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/DelegateHandler.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/DelegateHandler.java deleted file mode 100644 index 1749d2580..000000000 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/DelegateHandler.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2016-2017 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.web.flux.request; - -import org.springframework.beans.factory.ListableBeanFactory; -import org.springframework.cloud.function.context.catalog.FunctionInspector; - -public abstract class DelegateHandler { - - private final ListableBeanFactory factory; - private FunctionInspector processor; - private Object handler; - private final Object source; - - public DelegateHandler(ListableBeanFactory factory, Object source) { - this.factory = factory; - this.source = source; - } - - public Class type() { - return processor().getInputType(handler()); - } - - private Object handler() { - if (handler == null) { - handler = source instanceof String ? factory.getBean((String) source) - : source; - } - return handler; - } - - private FunctionInspector processor() { - if (processor == null) { - processor = factory.getBean(FunctionInspector.class); - } - return processor; - } - -} \ No newline at end of file diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxFormRequest.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxFormRequest.java deleted file mode 100644 index daa7cb6ef..000000000 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxFormRequest.java +++ /dev/null @@ -1,41 +0,0 @@ -package org.springframework.cloud.function.web.flux.request; - -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; -import reactor.core.publisher.Flux; - -import java.util.Arrays; -import java.util.Map; - -public class FluxFormRequest { - - private Map map; - - public FluxFormRequest(Map map) { - this.map = map; - } - - public static FluxFormRequest from(Map map) { - return new FluxFormRequest<>(map); - } - - public Flux> flux() { - return Flux.just(buildMap()); - } - - public MultiValueMap body() { - return buildMap(); - } - - private MultiValueMap buildMap() { - - if (map == null) - return null; - - MultiValueMap result = new LinkedMultiValueMap<>(); - map.forEach((key, values) -> result.put(key, Arrays.asList(values))); - return result; - - } - -} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxHandlerMethodArgumentResolver.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxHandlerMethodArgumentResolver.java deleted file mode 100644 index 386be5b43..000000000 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxHandlerMethodArgumentResolver.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright 2016-2017 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.web.flux.request; - -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import javax.servlet.http.HttpServletRequest; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.springframework.cloud.function.context.catalog.FunctionInspector; -import org.springframework.cloud.function.context.message.MessageUtils; -import org.springframework.cloud.function.json.JsonMapper; -import org.springframework.cloud.function.web.flux.constants.WebRequestConstants; -import org.springframework.cloud.function.web.util.HeaderUtils; -import org.springframework.core.MethodParameter; -import org.springframework.core.Ordered; -import org.springframework.http.MediaType; -import org.springframework.http.server.ServletServerHttpRequest; -import org.springframework.messaging.MessageHeaders; -import org.springframework.util.StreamUtils; -import org.springframework.util.StringUtils; -import org.springframework.web.bind.support.WebDataBinderFactory; -import org.springframework.web.context.request.NativeWebRequest; -import org.springframework.web.method.support.HandlerMethodArgumentResolver; -import org.springframework.web.method.support.ModelAndViewContainer; -import org.springframework.web.util.ContentCachingRequestWrapper; - -/** - * Converter for request bodies of type Flux. - * - * @author Dave Syer - * - */ -public class FluxHandlerMethodArgumentResolver - implements HandlerMethodArgumentResolver, Ordered { - - private static Log logger = LogFactory - .getLog(FluxHandlerMethodArgumentResolver.class); - - private final JsonMapper mapper; - - private FunctionInspector inspector; - - public FluxHandlerMethodArgumentResolver(FunctionInspector inspector, - JsonMapper mapper) { - this.inspector = inspector; - this.mapper = mapper; - } - - @Override - public int getOrder() { - return Ordered.HIGHEST_PRECEDENCE; - } - - @Override - public Object resolveArgument(MethodParameter parameter, - ModelAndViewContainer mavContainer, NativeWebRequest webRequest, - WebDataBinderFactory binderFactory) throws Exception { - Object handler = webRequest.getAttribute(WebRequestConstants.HANDLER, - NativeWebRequest.SCOPE_REQUEST); - Class type = inspector.getInputType(handler); - if (type == null) { - type = Object.class; - } - boolean message = inspector.isMessage(handler); - List body; - ContentCachingRequestWrapper nativeRequest = new ContentCachingRequestWrapper( - webRequest.getNativeRequest(HttpServletRequest.class)); - if (logger.isDebugEnabled()) { - logger.debug("Resolving request body into type: " + type); - } - if (isPlainText(webRequest) && CharSequence.class.isAssignableFrom(type)) { - body = Arrays.asList(StreamUtils.copyToString(nativeRequest.getInputStream(), - Charset.forName("UTF-8"))); - nativeRequest.setAttribute(WebRequestConstants.INPUT_SINGLE, true); - } - else { - String json = new String(StreamUtils.copyToString( - nativeRequest.getInputStream(), Charset.forName("UTF-8"))); - if (!StringUtils.hasText(json)) { - body = null; - } - else { - if (json.startsWith("[")) { - body = mapper.toList(json, type); - } - else { - nativeRequest.setAttribute(WebRequestConstants.INPUT_SINGLE, true); - body = Arrays.asList(mapper.toSingle(json, type)); - } - } - } - if (body != null && message) { - List messages = new ArrayList<>(); - MessageHeaders headers = HeaderUtils.fromHttp(new ServletServerHttpRequest( - webRequest.getNativeRequest(HttpServletRequest.class)).getHeaders()); - for (Object payload : body) { - messages.add(MessageUtils.create(handler, payload, headers)); - } - body = messages; - } - return new FluxRequest<>(body); - } - - private boolean isPlainText(NativeWebRequest webRequest) { - String value = webRequest.getHeader("Content-Type"); - if (value != null) { - return MediaType.valueOf(value).isCompatibleWith(MediaType.TEXT_PLAIN); - } - return false; - } - - @Override - public boolean supportsParameter(MethodParameter parameter) { - return FluxRequest.class.isAssignableFrom(parameter.getParameterType()); - } - -} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxRequest.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxRequest.java deleted file mode 100644 index 00b2225fc..000000000 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxRequest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2016-2017 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.web.flux.request; - -import java.util.List; - -import reactor.core.publisher.Flux; - -/** - * @author Dave Syer - * - */ -public class FluxRequest { - - private List body; - - public FluxRequest(List body) { - this.body = body; - } - - public Flux flux() { - return Flux.fromIterable(body); - } - - public List body() { - return body; - } - -} - - diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxResponseBodyEmitter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxResponseBodyEmitter.java deleted file mode 100644 index d8814cca9..000000000 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxResponseBodyEmitter.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2013-2017 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.web.flux.response; - -import org.reactivestreams.Publisher; - -import org.springframework.http.HttpHeaders; -import org.springframework.http.MediaType; -import org.springframework.http.server.ServerHttpResponse; -import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; - -import reactor.core.publisher.Flux; - -/** - * A specialized {@link ResponseBodyEmitter} that handles {@link Flux} return types. - * - * @author Dave Syer - */ -class FluxResponseBodyEmitter extends ResponseBodyEmitter { - - private final MediaType mediaType; - private ResponseBodyEmitterSubscriber subscriber; - - public FluxResponseBodyEmitter(Publisher observable) { - this(new HttpHeaders(), null, observable); - } - - public FluxResponseBodyEmitter(HttpHeaders request, MediaType mediaType, - Publisher observable) { - super(); - this.mediaType = mediaType; - this.subscriber = new ResponseBodyEmitterSubscriber(request, mediaType, - observable, this, MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)); - } - - @Override - protected void extendResponse(ServerHttpResponse outputMessage) { - super.extendResponse(outputMessage); - this.subscriber.extendResponse(outputMessage); - HttpHeaders headers = outputMessage.getHeaders(); - if (headers.getContentType() == null && this.mediaType != null - && !MediaType.ALL.equals(this.mediaType)) { - headers.setContentType(this.mediaType); - } - } -} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxResponseSseEmitter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxResponseSseEmitter.java deleted file mode 100644 index 07d9e0c5b..000000000 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxResponseSseEmitter.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2013-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.web.flux.response; - -import org.reactivestreams.Publisher; - -import org.springframework.http.HttpHeaders; -import org.springframework.http.MediaType; -import org.springframework.http.server.ServerHttpResponse; -import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; -import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; - -import reactor.core.publisher.Flux; - -/** - * A specialized {@link ResponseBodyEmitter} that handles {@link Flux} return types with - * SSE streams. - * - * @author Dave Syer - */ -class FluxResponseSseEmitter extends SseEmitter { - - private ResponseBodyEmitterSubscriber subscriber; - - public FluxResponseSseEmitter(Publisher observable) { - this(new HttpHeaders(), MediaType.valueOf("text/plain"), observable); - } - - public FluxResponseSseEmitter(HttpHeaders request, MediaType mediaType, - Publisher observable) { - super(); - this.subscriber = new ResponseBodyEmitterSubscriber(request, mediaType, - observable, this, false); - } - - @Override - protected void extendResponse(ServerHttpResponse outputMessage) { - super.extendResponse(outputMessage); - this.subscriber.extendResponse(outputMessage); - } -} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxReturnValueHandler.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxReturnValueHandler.java deleted file mode 100644 index afb7e5ae9..000000000 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxReturnValueHandler.java +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Copyright 2013-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.web.flux.response; - -import java.lang.reflect.Method; -import java.time.Duration; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Publisher; - -import org.springframework.cloud.function.context.catalog.FunctionInspector; -import org.springframework.cloud.function.web.flux.constants.WebRequestConstants; -import org.springframework.cloud.function.web.util.HeaderUtils; -import org.springframework.core.MethodParameter; -import org.springframework.core.ResolvableType; -import org.springframework.http.HttpHeaders; -import org.springframework.http.MediaType; -import org.springframework.http.ResponseEntity; -import org.springframework.http.converter.HttpMessageConverter; -import org.springframework.http.server.ServletServerHttpRequest; -import org.springframework.messaging.Message; -import org.springframework.util.ReflectionUtils; -import org.springframework.web.context.request.NativeWebRequest; -import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler; -import org.springframework.web.method.support.ModelAndViewContainer; -import org.springframework.web.servlet.mvc.method.annotation.RequestResponseBodyMethodProcessor; -import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; -import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -/** - * A specialized {@link AsyncHandlerMethodReturnValueHandler} that handles {@link Flux} - * return types. - * - * @author Dave Syer - */ -public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHandler { - - private static Log logger = LogFactory.getLog(FluxReturnValueHandler.class); - - private ResponseBodyEmitterReturnValueHandler delegate; - private RequestResponseBodyMethodProcessor single; - private long timeout = 1000L; - private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream"); - - private FunctionInspector inspector; - - private MethodParameter singleReturnType; - - public FluxReturnValueHandler(FunctionInspector inspector, - List> messageConverters) { - this.inspector = inspector; - this.delegate = new ResponseBodyEmitterReturnValueHandler(messageConverters); - this.single = new RequestResponseBodyMethodProcessor(messageConverters); - Method method = ReflectionUtils.findMethod(getClass(), "singleValue"); - singleReturnType = new MethodParameter(method, -1); - } - - ResponseEntity singleValue() { - return null; - } - - /** - * Timeout for clients. If no items are seen on an HTTP response in this period then - * the response is closed. - * - * @param timeout the timeout to set - */ - public void setTimeout(long timeout) { - this.timeout = timeout; - } - - @Override - public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) { - if (returnValue != null) { - return supportsReturnType(returnType); - } - return false; - } - - @Override - public boolean supportsReturnType(MethodParameter returnType) { - return (returnType.getParameterType() != null - && (Publisher.class.isAssignableFrom(returnType.getParameterType()) - || isResponseEntity(returnType))) - || Publisher.class - .isAssignableFrom(returnType.getMethod().getReturnType()); - } - - private boolean isResponseEntity(MethodParameter returnType) { - if (ResponseEntity.class.isAssignableFrom(returnType.getParameterType())) { - Class bodyType = ResolvableType.forMethodParameter(returnType) - .getGeneric(0).resolve(); - return bodyType != null && Publisher.class.isAssignableFrom(bodyType); - } - return false; - } - - @Override - public void handleReturnValue(Object returnValue, MethodParameter returnType, - ModelAndViewContainer mavContainer, NativeWebRequest webRequest) - throws Exception { - - if (returnValue == null) { - mavContainer.setRequestHandled(true); - return; - } - - Object adaptFrom = returnValue; - if (returnValue instanceof ResponseEntity) { - ResponseEntity value = (ResponseEntity) returnValue; - adaptFrom = value.getBody(); - HttpServletResponse response = webRequest - .getNativeResponse(HttpServletResponse.class); - response.setStatus(value.getStatusCodeValue()); - HttpHeaders headers = value.getHeaders(); - for (String name : headers.keySet()) { - List list = headers.get(name); - for (String header : list) { - response.addHeader(name, header); - } - } - } - Publisher flux = (Publisher) adaptFrom; - - Object handler = webRequest.getAttribute(WebRequestConstants.HANDLER, - NativeWebRequest.SCOPE_REQUEST); - Class type = inspector.getOutputType(handler); - - if (isOutputSingle(webRequest, handler, type)) { - Object result = Flux.from(flux).blockFirst(); - if (result instanceof Message) { - Message message = (Message) result; - result = message.getPayload(); - addHeaders(webRequest, message); - } - single.handleReturnValue(result, singleReturnType, mavContainer, webRequest); - return; - } - - MediaType mediaType = null; - if (isPlainText(webRequest) && (CharSequence.class.isAssignableFrom(type) - || Void.class.isAssignableFrom(type))) { - mediaType = MediaType.TEXT_PLAIN; - } - else { - mediaType = findMediaType(webRequest); - } - if (logger.isDebugEnabled()) { - logger.debug( - "Handling return value " + type + " with media type: " + mediaType); - } - ServletServerHttpRequest request = new ServletServerHttpRequest( - webRequest.getNativeRequest(HttpServletRequest.class)); - delegate.handleReturnValue( - getEmitter(timeout, flux, mediaType, request.getHeaders()), returnType, - mavContainer, webRequest); - } - - private void addHeaders(NativeWebRequest webRequest, Message message) { - HttpServletResponse response = webRequest - .getNativeResponse(HttpServletResponse.class); - ServletServerHttpRequest request = new ServletServerHttpRequest( - webRequest.getNativeRequest(HttpServletRequest.class)); - HttpHeaders headers = HeaderUtils.fromMessage(message.getHeaders(), - request.getHeaders()); - for (String name : headers.keySet()) { - for (Object object : headers.get(name)) { - response.addHeader(name, object.toString()); - } - } - } - - private boolean isOutputSingle(NativeWebRequest webRequest, Object handler, - Class type) { - Boolean single = (Boolean) webRequest.getAttribute( - WebRequestConstants.OUTPUT_SINGLE, NativeWebRequest.SCOPE_REQUEST); - if (single == null) { - // If the declared return type is a collection then we can render it as a - // "single" value - return Collection.class.isAssignableFrom(type); - } - return single; - } - - private MediaType findMediaType(NativeWebRequest webRequest) { - List accepts = Arrays.asList(MediaType.ALL); - MediaType mediaType = null; - if (webRequest.getHeader("Accept") != null) { - accepts = MediaType.parseMediaTypes(webRequest.getHeader("Accept")); - for (MediaType accept : accepts) { - if (!MediaType.ALL.equals(accept) - && MediaType.APPLICATION_JSON.isCompatibleWith(accept)) { - mediaType = MediaType.APPLICATION_JSON; - // Prefer JSON if that is acceptable - break; - } - else if (mediaType == null) { - mediaType = accept; - } - } - } - if (mediaType == null) { - mediaType = MediaType.APPLICATION_JSON; - } - return mediaType; - } - - private boolean isPlainText(NativeWebRequest webRequest) { - String value = webRequest.getHeader("Content-Type"); - if (value != null) { - return MediaType.valueOf(value).isCompatibleWith(MediaType.TEXT_PLAIN); - } - return false; - } - - private ResponseBodyEmitter getEmitter(Long timeout, Publisher flux, - MediaType mediaType, HttpHeaders request) { - Publisher exported = flux instanceof Mono ? Mono.from(flux) - : Flux.from(flux).timeout(Duration.ofMillis(timeout), Flux.empty()); - if (!MediaType.ALL.equals(mediaType) - && EVENT_STREAM.isCompatibleWith(mediaType)) { - // TODO: more subtle content negotiation - return new FluxResponseSseEmitter(request, MediaType.APPLICATION_JSON, - exported); - } - return new FluxResponseBodyEmitter(request, mediaType, exported); - } - -} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/ResponseBodyEmitterSubscriber.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/ResponseBodyEmitterSubscriber.java deleted file mode 100644 index 3c5961561..000000000 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/ResponseBodyEmitterSubscriber.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Copyright 2013-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.web.flux.response; - -import java.io.IOException; -import java.util.concurrent.TimeoutException; - -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import org.springframework.cloud.function.web.util.HeaderUtils; -import org.springframework.http.HttpHeaders; -import org.springframework.http.MediaType; -import org.springframework.http.server.ServerHttpResponse; -import org.springframework.messaging.Message; -import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -/** - * Subscriber that emits any value produced by the {@link Flux} into the delegated - * {@link ResponseBodyEmitter}. - * - * @author Dave Syer - */ -class ResponseBodyEmitterSubscriber implements Subscriber { - - private final MediaType mediaType; - - private Subscription subscription; - - private final ResponseBodyEmitter responseBodyEmitter; - - private boolean completed; - - private boolean firstElementWritten; - - private boolean single; - - private final boolean json; - - private Message first; - - private final HttpHeaders request; - - public ResponseBodyEmitterSubscriber(HttpHeaders request, MediaType mediaType, - Publisher observable, ResponseBodyEmitter responseBodyEmitter, - boolean json) { - - this.request = request; - this.mediaType = mediaType; - this.responseBodyEmitter = responseBodyEmitter; - this.json = json; - this.responseBodyEmitter.onTimeout(new Timeout()); - this.responseBodyEmitter.onCompletion(new Complete()); - this.single = observable instanceof Mono; - observable.subscribe(this); - } - - public void extendResponse(ServerHttpResponse response) { - headers(response); - } - - @Override - public void onSubscribe(Subscription subscription) { - this.subscription = subscription; - subscription.request(Long.MAX_VALUE); - } - - @Override - public void onNext(Object value) { - - Object object = value; - - if (object instanceof Message) { - Message message = (Message) object; - object = message.getPayload(); - this.first = message; - } - - try { - if (isJson()) { - if (!this.firstElementWritten) { - if (!single) { - responseBodyEmitter.send("["); - this.firstElementWritten = true; - } - } - else { - responseBodyEmitter.send(","); - } - if (!single && object.getClass() == String.class - && !((String) object).contains("\"")) { - object = "\"" + object + "\""; - } - } - if (!completed) { - responseBodyEmitter.send(object, mediaType); - } - } - catch ( - - IOException e) { - throw new RuntimeException(e.getMessage(), e); - } - } - - private void headers(ServerHttpResponse response) { - if (this.first != null) { - Message message = first; - try { - HttpHeaders headers = HeaderUtils.fromMessage(message.getHeaders(), - request); - for (String name : headers.keySet()) { - for (String value : headers.get(name)) { - response.getHeaders().add(name, value); - } - } - } - catch (Exception e) { - // Headers could not be set - } - } - } - - @Override - public void onError(Throwable e) { - if (!completed) { - completed = true; - try { - if (isJson()) { - if (!single) { - if (!this.firstElementWritten) { - responseBodyEmitter.send("[]"); - } - else { - responseBodyEmitter.send("]"); - } - } - } - if (e instanceof TimeoutException) { - responseBodyEmitter.complete(); - } - else { - responseBodyEmitter.completeWithError(e); - } - } - catch (IOException ex) { - throw new RuntimeException(ex.getMessage(), ex); - } - } - } - - @Override - public void onComplete() { - if (!completed) { - completed = true; - try { - if (isJson()) { - if (!single) { - if (!this.firstElementWritten) { - responseBodyEmitter.send("["); - } - responseBodyEmitter.send("]"); - } - } - } - catch (IOException e) { - throw new RuntimeException(e.getMessage(), e); - } - responseBodyEmitter.complete(); - } - } - - private boolean isJson() { - return json; - } - - class Complete implements Runnable { - - @Override - public void run() { - ResponseBodyEmitterSubscriber.this.subscription.cancel(); - } - } - - class Timeout implements Runnable { - - @Override - public void run() { - onComplete(); - ResponseBodyEmitterSubscriber.this.subscription.cancel(); - } - } - -} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionController.java new file mode 100644 index 000000000..b6a63f30d --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionController.java @@ -0,0 +1,116 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.mvc; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.reactivestreams.Publisher; + +import org.springframework.cloud.function.web.RequestProcessor; +import org.springframework.cloud.function.web.RequestProcessor.FunctionWrapper; +import org.springframework.cloud.function.web.constants.WebRequestConstants; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Component; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.context.request.WebRequest; + +import reactor.core.publisher.Mono; + +/** + * @author Dave Syer + * @author Mark Fisher + */ +@Component +public class FunctionController { + + private RequestProcessor processor; + + public FunctionController(RequestProcessor processor) { + this.processor = processor; + } + + @PostMapping(path = "/**", consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE) + @ResponseBody + public Mono> form(WebRequest request) { + FunctionWrapper wrapper = wrapper(request); + return processor.post(wrapper, null, false); + } + + @PostMapping(path = "/**") + @ResponseBody + public Mono> post(WebRequest request, + @RequestBody(required = false) String body) { + FunctionWrapper wrapper = wrapper(request); + return processor.post(wrapper, body, false); + } + + @PostMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @ResponseBody + public Mono>> postStream(WebRequest request, + @RequestBody(required = false) String body) { + FunctionWrapper wrapper = wrapper(request); + return processor.post(wrapper, body, true).map(response -> ResponseEntity.ok() + .headers(response.getHeaders()).body((Publisher) response.getBody())); + } + + @GetMapping(path = "/**") + @ResponseBody + public Mono> get(WebRequest request) { + FunctionWrapper wrapper = wrapper(request); + return processor.get(wrapper); + } + + @GetMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @ResponseBody + public Mono>> getStream(WebRequest request) { + FunctionWrapper wrapper = wrapper(request); + return processor.stream(wrapper).map(response -> ResponseEntity.ok() + .headers(response.getHeaders()).body((Publisher) response.getBody())); + } + + private FunctionWrapper wrapper(WebRequest request) { + @SuppressWarnings("unchecked") + Function, Publisher> function = (Function, Publisher>) request + .getAttribute(WebRequestConstants.FUNCTION, WebRequest.SCOPE_REQUEST); + @SuppressWarnings("unchecked") + Consumer> consumer = (Consumer>) request + .getAttribute(WebRequestConstants.CONSUMER, WebRequest.SCOPE_REQUEST); + @SuppressWarnings("unchecked") + Supplier> supplier = (Supplier>) request + .getAttribute(WebRequestConstants.SUPPLIER, WebRequest.SCOPE_REQUEST); + FunctionWrapper wrapper = RequestProcessor.wrapper(function, consumer, supplier); + for (String key : request.getParameterMap().keySet()) { + wrapper.params().addAll(key, Arrays.asList(request.getParameterValues(key))); + } + for (Iterator keys = request.getHeaderNames(); keys.hasNext();) { + String key = keys.next(); + wrapper.headers().addAll(key, Arrays.asList(request.getHeaderValues(key))); + } + String argument = (String) request.getAttribute(WebRequestConstants.ARGUMENT, + WebRequest.SCOPE_REQUEST); + wrapper.argument(argument); + return wrapper; + } +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionHandlerMapping.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionHandlerMapping.java new file mode 100644 index 000000000..74a3f312a --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionHandlerMapping.java @@ -0,0 +1,163 @@ +/* + * Copyright 2012-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.mvc; + +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import javax.servlet.http.HttpServletRequest; + +import org.reactivestreams.Publisher; + +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.cloud.function.context.FunctionCatalog; +import org.springframework.cloud.function.web.constants.WebRequestConstants; +import org.springframework.context.annotation.Configuration; +import org.springframework.util.StringUtils; +import org.springframework.web.method.HandlerMethod; +import org.springframework.web.servlet.HandlerMapping; +import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping; + +/** + * @author Dave Syer + * + */ +@Configuration +@ConditionalOnClass(RequestMappingHandlerMapping.class) +public class FunctionHandlerMapping extends RequestMappingHandlerMapping + implements InitializingBean { + + private final FunctionCatalog functions; + + private final FunctionController controller; + + @Value("${spring.cloud.function.web.path:}") + private String prefix = ""; + + @Autowired + public FunctionHandlerMapping(FunctionCatalog catalog, + FunctionController controller) { + this.functions = catalog; + logger.info("FunctionCatalog: " + catalog); + setOrder(super.getOrder() - 5); + this.controller = controller; + } + + @Override + public void afterPropertiesSet() { + super.afterPropertiesSet(); + detectHandlerMethods(controller); + while (prefix.endsWith("/")) { + prefix = prefix.substring(0, prefix.length() - 1); + } + } + + @Override + protected void initHandlerMethods() { + } + + @Override + protected HandlerMethod getHandlerInternal(HttpServletRequest request) + throws Exception { + HandlerMethod handler = super.getHandlerInternal(request); + if (handler == null) { + return null; + } + String path = (String) request + .getAttribute(HandlerMapping.PATH_WITHIN_HANDLER_MAPPING_ATTRIBUTE); + if (StringUtils.hasText(prefix) && !path.startsWith(prefix)) { + return null; + } + if (path.startsWith(prefix)) { + path = path.substring(prefix.length()); + } + if (path == null) { + return handler; + } + Object function = findFunctionForGet(request, path); + if (function != null) { + if (logger.isDebugEnabled()) { + logger.debug("Found function for GET: " + path); + } + request.setAttribute(WebRequestConstants.HANDLER, function); + return handler; + } + function = findFunctionForPost(request, path); + if (function != null) { + if (logger.isDebugEnabled()) { + logger.debug("Found function for POST: " + path); + } + request.setAttribute(WebRequestConstants.HANDLER, function); + return handler; + } + return null; + } + + private Object findFunctionForPost(HttpServletRequest request, String path) { + if (!request.getMethod().equals("POST")) { + return null; + } + path = path.startsWith("/") ? path.substring(1) : path; + Consumer> consumer = functions.lookup(Consumer.class, path); + if (consumer != null) { + request.setAttribute(WebRequestConstants.CONSUMER, consumer); + return consumer; + } + Function function = functions.lookup(Function.class, path); + if (function != null) { + request.setAttribute(WebRequestConstants.FUNCTION, function); + return function; + } + return null; + } + + private Object findFunctionForGet(HttpServletRequest request, String path) { + if (!request.getMethod().equals("GET")) { + return null; + } + path = path.startsWith("/") ? path.substring(1) : path; + Supplier> supplier = functions.lookup(Supplier.class, path); + if (supplier != null) { + request.setAttribute(WebRequestConstants.SUPPLIER, supplier); + return supplier; + } + StringBuilder builder = new StringBuilder(); + String name = path; + String value = null; + for (String element : path.split("/")) { + if (builder.length() > 0) { + builder.append("/"); + } + builder.append(element); + name = builder.toString(); + value = path.length() > name.length() ? path.substring(name.length() + 1) + : null; + Function function = functions.lookup(Function.class, name); + if (function != null) { + request.setAttribute(WebRequestConstants.FUNCTION, function); + request.setAttribute(WebRequestConstants.ARGUMENT, value); + return function; + } + } + return null; + } + +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/ReactorAutoConfiguration.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/ReactorAutoConfiguration.java new file mode 100644 index 000000000..deb9018a9 --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/ReactorAutoConfiguration.java @@ -0,0 +1,92 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.mvc; + +import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication; +import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication.Type; +import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration; +import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration; +import org.springframework.cloud.function.context.FunctionCatalog; +import org.springframework.cloud.function.context.catalog.FunctionInspector; +import org.springframework.cloud.function.web.RequestProcessor; +import org.springframework.cloud.function.web.StringConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.core.convert.ConversionService; +import org.springframework.core.convert.support.DefaultConversionService; +import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler; + +import reactor.core.publisher.Flux; + +/** + * @author Dave Syer + * @author Mark Fisher + * @author Oleg Zhurakousky + */ +@Configuration +@ConditionalOnWebApplication(type=Type.SERVLET) +@ConditionalOnClass({ Flux.class, AsyncHandlerMethodReturnValueHandler.class }) +@Import({ FunctionController.class, RequestProcessor.class }) +@AutoConfigureAfter({ JacksonAutoConfiguration.class, GsonAutoConfiguration.class }) +public class ReactorAutoConfiguration { + + @Bean + public FunctionHandlerMapping functionHandlerMapping(FunctionCatalog catalog, + FunctionController controller) { + return new FunctionHandlerMapping(catalog, controller); + } + + @Bean + @ConditionalOnMissingBean + public StringConverter functionStringConverter(FunctionInspector inspector, + ConfigurableListableBeanFactory beanFactory) { + return new BasicStringConverter(inspector, beanFactory); + } + + private static class BasicStringConverter implements StringConverter { + + private ConversionService conversionService; + private ConfigurableListableBeanFactory registry; + private FunctionInspector inspector; + + public BasicStringConverter(FunctionInspector inspector, + ConfigurableListableBeanFactory registry) { + this.inspector = inspector; + this.registry = registry; + } + + @Override + public Object convert(Object function, String value) { + if (conversionService == null && registry != null) { + ConversionService conversionService = this.registry + .getConversionService(); + this.conversionService = conversionService != null ? conversionService + : new DefaultConversionService(); + } + Class type = inspector.getInputType(function); + return conversionService.canConvert(String.class, type) + ? conversionService.convert(value, type) + : value; + } + + } +} diff --git a/spring-cloud-function-web/src/main/resources/META-INF/spring.factories b/spring-cloud-function-web/src/main/resources/META-INF/spring.factories index fd9ea75b8..f36438425 100644 --- a/spring-cloud-function-web/src/main/resources/META-INF/spring.factories +++ b/spring-cloud-function-web/src/main/resources/META-INF/spring.factories @@ -1,5 +1,6 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ -org.springframework.cloud.function.web.flux.ReactorAutoConfiguration +org.springframework.cloud.function.web.flux.ReactorAutoConfiguration,\ +org.springframework.cloud.function.web.mvc.ReactorAutoConfiguration org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc=\ org.springframework.cloud.function.web.flux.ReactorAutoConfiguration,\ diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/flux/FluxRestApplicationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/flux/FluxRestApplicationTests.java new file mode 100644 index 000000000..8a43c0ea9 --- /dev/null +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/flux/FluxRestApplicationTests.java @@ -0,0 +1,416 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.function.flux; + +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.reactivestreams.Publisher; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.cloud.function.flux.FluxRestApplicationTests.TestConfiguration; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.RequestEntity; +import org.springframework.http.ResponseEntity; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.StringUtils; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestController; + +import static org.assertj.core.api.Assertions.assertThat; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Tests for vanilla MVC handling (no function layer). Validates the MVC customizations + * that are added in this project independently of the specific concerns of function. + * + * @author Dave Syer + * + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = TestConfiguration.class, webEnvironment = WebEnvironment.RANDOM_PORT, properties = "spring.main.web-application-type=reactive") +public class FluxRestApplicationTests { + + private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream"); + @LocalServerPort + private int port; + @Autowired + private TestRestTemplate rest; + @Autowired + private TestConfiguration test; + + @Before + public void init() { + test.list.clear(); + } + + @Test + public void wordsSSE() throws Exception { + assertThat(rest.exchange( + RequestEntity.get(new URI("/words")).accept(EVENT_STREAM).build(), + String.class).getBody()).isEqualTo(sse("foo", "bar")); + } + + @Test + public void wordsJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/words")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("[\"foo\",\"bar\"]"); + } + + @Test + @Ignore("Fix error handling") + public void errorJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/bang")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("[\"foo\"]"); + } + + @Test + public void words() throws Exception { + ResponseEntity result = rest + .exchange(RequestEntity.get(new URI("/words")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]"); + } + + @Test + public void foos() throws Exception { + ResponseEntity result = rest + .exchange(RequestEntity.get(new URI("/foos")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()) + .isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"); + } + + @Test + public void getMore() throws Exception { + ResponseEntity result = rest + .exchange(RequestEntity.get(new URI("/get/more")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]"); + } + + @Test + @Ignore("Should this even work? Or do we need to be explicit about the JSON?") + public void updates() throws Exception { + ResponseEntity result = rest.exchange( + RequestEntity.post(new URI("/updates")).body("one\ntwo"), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + assertThat(test.list).hasSize(2); + assertThat(result.getBody()).isEqualTo("onetwo"); + } + + @Test + public void updatesJson() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/updates")).contentType(MediaType.APPLICATION_JSON) + .body("[\"one\",\"two\"]"), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + assertThat(test.list).hasSize(2); + assertThat(result.getBody()).isEqualTo("[\"one\",\"two\"]"); + } + + @Test + public void addFoos() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/addFoos")).contentType(MediaType.APPLICATION_JSON) + .body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + assertThat(test.list).hasSize(2); + assertThat(result.getBody()) + .isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"); + } + + @Test + public void timeout() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/timeout")).build(), String.class) + .getBody()).isEqualTo("[\"foo\"]"); + } + + @Test + public void emptyJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/empty")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("[]"); + } + + @Test + public void sentences() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/sentences")).build(), String.class) + .getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]"); + } + + @Test + public void sentencesAcceptAny() throws Exception { + assertThat(rest.exchange( + RequestEntity.get(new URI("/sentences")).accept(MediaType.ALL).build(), + String.class).getBody()) + .isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]"); + } + + @Test + public void sentencesAcceptJson() throws Exception { + ResponseEntity result = rest + .exchange( + RequestEntity.get(new URI("/sentences")) + .accept(MediaType.APPLICATION_JSON).build(), + String.class); + assertThat(result.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]"); + assertThat(result.getHeaders().getContentType()) + .isGreaterThanOrEqualTo(MediaType.APPLICATION_JSON); + } + + @Test + public void uppercase() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class); + assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]"); + } + + @Test + public void uppercaseFoos() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/upFoos")).contentType(MediaType.APPLICATION_JSON) + .body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class); + assertThat(result.getBody()) + .isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]"); + } + + @Test + public void transform() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/transform")).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class); + assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]"); + } + + @Test + public void postMore() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/post/more")).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class); + assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]"); + } + + @Test + public void uppercaseGet() throws Exception { + assertThat(rest.exchange(RequestEntity.get(new URI("/uppercase/foo")) + .accept(MediaType.TEXT_PLAIN).build(), String.class).getBody()) + .isEqualTo("[FOO]"); + } + + @Test + public void convertGet() throws Exception { + assertThat(rest.exchange(RequestEntity.get(new URI("/wrap/123")) + .accept(MediaType.TEXT_PLAIN).build(), String.class).getBody()) + .isEqualTo("..123.."); + } + + @Test + public void convertGetJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/entity/321")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("{\"value\":321}"); + } + + @Test + public void uppercaseJsonStream() throws Exception { + assertThat( + rest.exchange( + RequestEntity.post(new URI("/maps")) + .contentType(MediaType.APPLICATION_JSON) + .body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), + String.class).getBody()) + .isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]"); + } + + @Test + public void uppercaseSSE() throws Exception { + assertThat(rest.exchange(RequestEntity.post(new URI("/uppercase")) + .accept(EVENT_STREAM).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class).getBody()) + .isEqualTo(sse("[FOO]", "[BAR]")); + } + + @Test + public void altSSE() throws Exception { + assertThat(rest.exchange(RequestEntity.post(new URI("/alt")).accept(EVENT_STREAM) + .contentType(MediaType.APPLICATION_JSON).body("[\"foo\",\"bar\"]"), + String.class).getBody()).isEqualTo(sse("[FOO]", "[BAR]")); + } + + private String sse(String... values) { + return "data:" + StringUtils.arrayToDelimitedString(values, "\n\ndata:") + "\n\n"; + } + + @EnableAutoConfiguration + @RestController + @Configuration + public static class TestConfiguration { + + private List list = new ArrayList<>(); + + @PostMapping({ "/uppercase", "/transform", "/post/more" }) + public Flux uppercase(@RequestBody List flux) { + return Flux.fromIterable(flux).log() + .map(value -> "[" + value.trim().toUpperCase() + "]"); + } + + @PostMapping({ "/alt" }) + public Mono> alt(@RequestBody List flux) { + Publisher result = Flux.fromIterable(flux) + .map(value -> "[" + value.trim().toUpperCase() + "]"); + return Flux.from(result).log() + .then(Mono.fromSupplier(() -> ResponseEntity.ok(result))); + } + + @PostMapping("/upFoos") + public Flux upFoos(@RequestBody List list) { + return Flux.fromIterable(list).log() + .map(value -> new Foo(value.getValue().trim().toUpperCase())); + } + + @GetMapping("/uppercase/{id}") + public Mono> uppercaseGet(@PathVariable String id) { + return Mono.just(id).map(value -> "[" + value.trim().toUpperCase() + "]") + .flatMap(body -> Mono.just(ResponseEntity.ok(body))); + } + + @GetMapping("/wrap/{id}") + public Mono> wrapGet(@PathVariable int id) { + return Mono.just(id).log().map(value -> ".." + value + "..") + .flatMap(body -> Mono.just(ResponseEntity.ok(body))); + } + + @GetMapping("/entity/{id}") + public Mono> entity(@PathVariable Integer id) { + return Mono.just(id).log() + .map(value -> Collections.singletonMap("value", value)); + } + + @PostMapping("/maps") + public Flux> maps( + @RequestBody List> flux) { + return Flux.fromIterable(flux).map(value -> { + value.put("value", value.get("value").trim().toUpperCase()); + return value; + }); + } + + @GetMapping({ "/words", "/get/more" }) + public Flux words() { + return Flux.fromArray(new String[] { "foo", "bar" }); + } + + @GetMapping("/foos") + public Flux foos() { + return Flux.just(new Foo("foo"), new Foo("bar")); + } + + @PostMapping("/updates") + @ResponseStatus(HttpStatus.ACCEPTED) + public Flux updates(@RequestBody List list) { + Flux flux = Flux.fromIterable(list).cache(); + flux.subscribe(value -> this.list.add(value)); + return flux; + } + + @PostMapping("/addFoos") + @ResponseStatus(HttpStatus.ACCEPTED) + public Flux addFoos(@RequestBody List list) { + Flux flux = Flux.fromIterable(list).cache(); + flux.subscribe(value -> this.list.add(value.getValue())); + return flux; + } + + @GetMapping("/bang") + public Flux bang() { + return Flux.fromArray(new String[] { "foo", "bar" }).map(value -> { + if (value.equals("bar")) { + throw new RuntimeException("Bar"); + } + return value; + }); + } + + @GetMapping("/empty") + public Flux empty() { + return Flux.fromIterable(Collections.emptyList()); + } + + @GetMapping("/timeout") + public Flux timeout() { + return Flux.defer(() -> Flux.create(emitter -> { + emitter.next("foo"); + }).timeout(Duration.ofMillis(100L), Flux.empty())); + } + + @GetMapping("/sentences") + public Flux> sentences() { + return Flux.just(Arrays.asList("go", "home"), Arrays.asList("come", "back")); + } + + } + + public static class Foo { + private String value; + + public Foo(String value) { + this.value = value; + } + + Foo() { + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + } +} diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/mvc/MvcRestApplicationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/mvc/MvcRestApplicationTests.java index 71f7d3ea4..3aaff9321 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/mvc/MvcRestApplicationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/mvc/MvcRestApplicationTests.java @@ -61,8 +61,8 @@ import reactor.core.publisher.Mono; * @author Dave Syer * */ +@SpringBootTest(classes = TestConfiguration.class, webEnvironment = WebEnvironment.RANDOM_PORT, properties = "spring.main.web-application-type=servlet") @RunWith(SpringRunner.class) -@SpringBootTest(classes = TestConfiguration.class, webEnvironment = WebEnvironment.RANDOM_PORT) public class MvcRestApplicationTests { private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream"); @@ -253,11 +253,13 @@ public class MvcRestApplicationTests { @Test public void uppercaseJsonStream() throws Exception { - assertThat(rest - .exchange(RequestEntity.post(new URI("/maps")) - .contentType(MediaType.APPLICATION_JSON) - .body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class) - .getBody()).isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]"); + assertThat( + rest.exchange( + RequestEntity.post(new URI("/maps")) + .contentType(MediaType.APPLICATION_JSON) + .body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), + String.class).getBody()) + .isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]"); } @Test diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/HeadersToMessageTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/HeadersToMessageTests.java similarity index 73% rename from spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/HeadersToMessageTests.java rename to spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/HeadersToMessageTests.java index ac718ee7c..dab837e4b 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/HeadersToMessageTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/HeadersToMessageTests.java @@ -13,34 +13,41 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.springframework.cloud.function.web; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +package org.springframework.cloud.function.web.flux; import java.net.URI; import java.util.function.Function; import org.junit.Test; import org.junit.runner.RunWith; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.cloud.function.web.RestApplication; +import org.springframework.cloud.function.web.flux.HeadersToMessageTests.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.http.ResponseEntity; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** * * @author Oleg Zhurakousky * */ @RunWith(SpringRunner.class) -@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = {"spring.cloud.function.web.path=/functions" }) +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = { + "spring.cloud.function.web.path=/functions", + "spring.main.web-application-type=reactive" }) +@ContextConfiguration(classes= {RestApplication.class, TestConfiguration.class}) public class HeadersToMessageTests { @Autowired @@ -48,26 +55,27 @@ public class HeadersToMessageTests { @Test public void testBodyAndCustomHeaderFromMessagePropagation() throws Exception { - ResponseEntity postForEntity = rest.postForEntity(new URI("/functions/employee"), "{\"name\":\"Bob\",\"age\":25}", String.class); + ResponseEntity postForEntity = rest.postForEntity( + new URI("/functions/employee"), "{\"name\":\"Bob\",\"age\":25}", + String.class); assertEquals("{\"name\":\"Bob\",\"age\":25}", postForEntity.getBody()); assertTrue(postForEntity.getHeaders().containsKey("x-content-type")); - assertEquals("application/xml", postForEntity.getHeaders().get("x-content-type").get(0)); + assertEquals("application/xml", + postForEntity.getHeaders().get("x-content-type").get(0)); assertEquals("bar", postForEntity.getHeaders().get("foo").get(0)); } @EnableAutoConfiguration @org.springframework.boot.test.context.TestConfiguration protected static class TestConfiguration { - @Bean({ "employee"}) - public Function, Message> function() { + @Bean({ "employee" }) + public Function, Message> function() { return request -> { Message message = MessageBuilder.withPayload(request.getPayload()) .setHeader("X-Content-Type", "application/xml") - .setHeader("foo", "bar") - .build(); + .setHeader("foo", "bar").build(); return message; }; - } + } } } - diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/HttpGetIntegrationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/HttpGetIntegrationTests.java new file mode 100644 index 000000000..21b3152d4 --- /dev/null +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/HttpGetIntegrationTests.java @@ -0,0 +1,360 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.function.web.flux; + +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.cloud.function.web.RestApplication; +import org.springframework.cloud.function.web.flux.HttpGetIntegrationTests.ApplicationConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.RequestEntity; +import org.springframework.http.ResponseEntity; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.MultiValueMap; +import org.springframework.util.StringUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +import reactor.core.publisher.Flux; + +/** + * @author Dave Syer + */ +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = "spring.main.web-application-type=reactive") +@ContextConfiguration(classes= {RestApplication.class, ApplicationConfiguration.class}) +public class HttpGetIntegrationTests { + + private static final MediaType EVENT_STREAM = MediaType.TEXT_EVENT_STREAM; + @LocalServerPort + private int port; + @Autowired + private TestRestTemplate rest; + @Autowired + private ApplicationConfiguration test; + + @Before + public void init() { + test.list.clear(); + } + + @Test + public void staticResource() { + assertThat(rest.getForObject("/test.html", String.class)).contains("Test"); + } + + @Test + public void wordsSSE() throws Exception { + assertThat(rest.exchange( + RequestEntity.get(new URI("/words")).accept(EVENT_STREAM).build(), + String.class).getBody()).isEqualTo(sse("foo", "bar")); + } + + @Test + public void wordsJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/words")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("[\"foo\",\"bar\"]"); + } + + @Test + @Ignore("Fix error handling") + public void errorJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/bang")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("[\"foo\"]"); + } + + @Test + public void words() throws Exception { + ResponseEntity result = rest + .exchange(RequestEntity.get(new URI("/words")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]"); + } + + @Test + public void word() throws Exception { + ResponseEntity result = rest.exchange( + RequestEntity.get(new URI("/word")).accept(MediaType.TEXT_PLAIN).build(), + String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()).isEqualTo("foo"); + } + + @Test + public void foos() throws Exception { + ResponseEntity result = rest + .exchange(RequestEntity.get(new URI("/foos")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()) + .isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"); + } + + @Test + public void getMore() throws Exception { + ResponseEntity result = rest + .exchange(RequestEntity.get(new URI("/get/more")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]"); + } + + @Test + public void bareWords() throws Exception { + ResponseEntity result = rest + .exchange(RequestEntity.get(new URI("/bareWords")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]"); + } + + @Test + public void timeoutJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/timeout")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("[\"foo\"]"); + } + + @Test + public void emptyJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/empty")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("[]"); + } + + @Test + public void sentences() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/sentences")).build(), String.class) + .getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]"); + } + + @Test + public void sentencesAcceptAny() throws Exception { + assertThat(rest.exchange( + RequestEntity.get(new URI("/sentences")).accept(MediaType.ALL).build(), + String.class).getBody()) + .isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]"); + } + + @Test + public void sentencesAcceptJson() throws Exception { + ResponseEntity result = rest + .exchange( + RequestEntity.get(new URI("/sentences")) + .accept(MediaType.APPLICATION_JSON).build(), + String.class); + assertThat(result.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]"); + assertThat(result.getHeaders().getContentType()) + .isGreaterThanOrEqualTo(MediaType.APPLICATION_JSON); + } + + @Test + public void sentencesAcceptSse() throws Exception { + ResponseEntity result = rest.exchange( + RequestEntity.get(new URI("/sentences")).accept(EVENT_STREAM).build(), + String.class); + assertThat(result.getBody()) + .isEqualTo(sse("[\"go\",\"home\"]", "[\"come\",\"back\"]")); + assertThat(result.getHeaders().getContentType().isCompatibleWith(EVENT_STREAM)) + .isTrue(); + } + + @Test + public void postMoreFoo() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .get(new URI("/post/more/foo")).accept(MediaType.TEXT_PLAIN).build(), + String.class); + assertThat(result.getBody()).isEqualTo("(FOO)"); + } + + @Test + public void uppercaseGet() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .get(new URI("/uppercase/foo")).accept(MediaType.TEXT_PLAIN).build(), + String.class); + assertThat(result.getBody()).isEqualTo("(FOO)"); + } + + @Test + public void convertGet() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .get(new URI("/wrap/123")).accept(MediaType.TEXT_PLAIN).build(), + String.class); + assertThat(result.getBody()).isEqualTo("..123.."); + } + + @Test + public void supplierFirst() { + assertThat(rest.getForObject("/not/a/function", String.class)) + .isEqualTo("[\"hello\"]"); + } + + @Test + public void convertGetJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/entity/321")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("{\"value\":321}"); + } + + private String sse(String... values) { + return "data:" + StringUtils.arrayToDelimitedString(values, "\n\ndata:") + "\n\n"; + } + + @EnableAutoConfiguration + @TestConfiguration + public static class ApplicationConfiguration { + + private List list = new ArrayList<>(); + + public static void main(String[] args) throws Exception { + SpringApplication.run(HttpGetIntegrationTests.ApplicationConfiguration.class, + args); + } + + @Bean({ "uppercase", "post/more" }) + public Function, Flux> uppercase() { + return flux -> flux.log() + .map(value -> "(" + value.trim().toUpperCase() + ")"); + } + + @Bean + public Function, Flux> wrap() { + return flux -> flux.log().map(value -> ".." + value + ".."); + } + + @Bean + public Function, Flux>> entity() { + return flux -> flux.log() + .map(value -> Collections.singletonMap("value", value)); + } + + @Bean({ "words", "get/more" }) + public Supplier> words() { + return () -> Flux.just("foo", "bar"); + } + + @Bean + public Supplier word() { + return () -> "foo"; + } + + @Bean + public Supplier> foos() { + return () -> Flux.just(new Foo("foo"), new Foo("bar")); + } + + @Bean + public Supplier> bareWords() { + return () -> Arrays.asList("foo", "bar"); + } + + @Bean + public Supplier> bang() { + return () -> Flux.fromArray(new String[] { "foo", "bar" }).map(value -> { + if (value.equals("bar")) { + throw new RuntimeException("Bar"); + } + return value; + }); + } + + @Bean + public Supplier> empty() { + return () -> Flux.fromIterable(Collections.emptyList()); + } + + @Bean("not/a/function") + public Supplier> supplier() { + return () -> Flux.just("hello"); + } + + @Bean("not/a") + public Function, Flux> function() { + return input -> Flux.just("bye"); + } + + @Bean + public Supplier> timeout() { + return () -> Flux.defer(() -> Flux.create(emitter -> { + emitter.next("foo"); + }).timeout(Duration.ofMillis(100L), Flux.empty())); + } + + @Bean + public Supplier>> sentences() { + return () -> Flux.just(Arrays.asList("go", "home"), + Arrays.asList("come", "back")); + } + + @Bean + public Function, Map> sum() { + return valueMap -> valueMap.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, values -> values + .getValue().stream().mapToInt(Integer::parseInt).sum())); + } + + } + + public static class Foo { + private String value; + + public Foo(String value) { + this.value = value; + } + + Foo() { + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + } + +} diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/HttpPostIntegrationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/HttpPostIntegrationTests.java new file mode 100644 index 000000000..581f9f66a --- /dev/null +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/HttpPostIntegrationTests.java @@ -0,0 +1,441 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.function.web.flux; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.cloud.function.web.RestApplication; +import org.springframework.cloud.function.web.flux.HttpPostIntegrationTests.ApplicationConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.RequestEntity; +import org.springframework.http.ResponseEntity; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.util.StringUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * @author Dave Syer + */ +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = "spring.main.web-application-type=reactive") +@ContextConfiguration(classes = { RestApplication.class, ApplicationConfiguration.class }) +public class HttpPostIntegrationTests { + + private static final MediaType EVENT_STREAM = MediaType.TEXT_EVENT_STREAM; + @LocalServerPort + private int port; + @Autowired + private TestRestTemplate rest; + @Autowired + private ApplicationConfiguration test; + + @Before + public void init() { + test.list.clear(); + } + + @Test + public void qualifierFoos() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity.post(new URI("/foos")) + .contentType(MediaType.APPLICATION_JSON).body("[\"foo\",\"bar\"]"), + String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()) + .isEqualTo("[{\"value\":\"[FOO]\"},{\"value\":\"[BAR]\"}]"); + } + + @Test + public void updates() throws Exception { + ResponseEntity result = rest.exchange( + RequestEntity.post(new URI("/updates")).body("[\"one\", \"two\"]"), + String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + assertThat(test.list).hasSize(2); + assertThat(result.getBody()).isNull(); + } + + @Test + public void updatesJson() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/updates")).contentType(MediaType.APPLICATION_JSON) + .body("[\"one\",\"two\"]"), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + assertThat(test.list).hasSize(2); + assertThat(result.getBody()).isEqualTo(null); + } + + @Test + public void addFoos() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/addFoos")).contentType(MediaType.APPLICATION_JSON) + .body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + assertThat(test.list).hasSize(2); + assertThat(result.getBody()).isEqualTo(null); + } + + @Test + public void bareUpdates() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/bareUpdates")).contentType(MediaType.APPLICATION_JSON) + .body("[\"one\",\"two\"]"), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(test.list).hasSize(2); + assertThat(result.getBody()).isEqualTo("[]"); + } + + @Test + public void uppercase() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class); + assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]"); + } + + @Test + public void messages() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/messages")).contentType(MediaType.APPLICATION_JSON) + .header("x-foo", "bar").body("[\"foo\",\"bar\"]"), String.class); + assertThat(result.getHeaders().getFirst("x-foo")).isEqualTo("bar"); + assertThat(result.getHeaders()).doesNotContainKey("id"); + assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]"); + } + + @Test + public void headers() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/headers")).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class); + assertThat(result.getHeaders().getFirst("foo")).isEqualTo("bar"); + assertThat(result.getHeaders()).doesNotContainKey("id"); + assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]"); + } + + @Test + public void uppercaseSingleValue() throws Exception { + ResponseEntity result = rest + .exchange( + RequestEntity.post(new URI("/uppercase")) + .contentType(MediaType.TEXT_PLAIN).body("foo"), + String.class); + assertThat(result.getBody()).isEqualTo("[\"(FOO)\"]"); + } + + @Test + @Ignore("WebFlux would split the request body into lines: TODO make this work the same") + public void uppercasePlainText() throws Exception { + ResponseEntity result = rest.exchange( + RequestEntity.post(new URI("/uppercase")) + .contentType(MediaType.TEXT_PLAIN).body("foo\nbar"), + String.class); + assertThat(result.getBody()).isEqualTo("(FOO\nBAR)"); + } + + @Test + public void uppercaseFoos() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/upFoos")).contentType(MediaType.APPLICATION_JSON) + .body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class); + assertThat(result.getBody()) + .isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]"); + } + + @Test + public void uppercaseFoo() throws Exception { + // Single Foo can be parsed + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/upFoos")).contentType(MediaType.APPLICATION_JSON) + .body("{\"value\":\"foo\"}"), String.class); + assertThat(result.getBody()).isEqualTo("[{\"value\":\"FOO\"}]"); + } + + @Test + public void bareUppercaseFoos() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/bareUpFoos")).contentType(MediaType.APPLICATION_JSON) + .body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class); + assertThat(result.getBody()) + .isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]"); + } + + @Test + public void bareUppercaseFoo() throws Exception { + // Single Foo can be parsed and returns a single value if the function is defined + // that way + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/bareUpFoos")).contentType(MediaType.APPLICATION_JSON) + .body("{\"value\":\"foo\"}"), String.class); + assertThat(result.getBody()).isEqualTo("{\"value\":\"FOO\"}"); + } + + @Test + public void bareUppercase() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/bareUppercase")).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class); + assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]"); + } + + @Test + public void singleValuedText() throws Exception { + ResponseEntity result = rest.exchange( + RequestEntity.post(new URI("/bareUppercase")).accept(MediaType.TEXT_PLAIN) + .contentType(MediaType.TEXT_PLAIN).body("foo"), + String.class); + assertThat(result.getBody()).isEqualTo("(FOO)"); + } + + @Test + public void transform() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/transform")).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class); + assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]"); + } + + @Test + public void postMore() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/post/more")).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class); + assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]"); + } + + @Test + public void convertPost() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity.post(new URI("/wrap")) + .contentType(MediaType.TEXT_PLAIN).body("123"), String.class); + // Result is multi-valued so it has to come out as an array + assertThat(result.getBody()).isEqualTo("[\"..123..\"]"); + } + + @Test + public void convertPostJson() throws Exception { + // If you POST a single value to a Function,Flux> it can't + // determine if the output is single valued, so it has to send an array back + ResponseEntity result = rest + .exchange( + RequestEntity.post(new URI("/doubler")) + .contentType(MediaType.TEXT_PLAIN).body("123"), + String.class); + assertThat(result.getBody()).isEqualTo("[246]"); + } + + @Test + public void uppercaseJsonArray() throws Exception { + assertThat(rest.exchange( + RequestEntity.post(new URI("/maps")) + .contentType(MediaType.APPLICATION_JSON) + // The new line in the middle is optional + .body("[{\"value\":\"foo\"},\n{\"value\":\"bar\"}]"), + String.class).getBody()) + .isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]"); + } + + @Test + public void uppercaseSSE() throws Exception { + assertThat(rest.exchange(RequestEntity.post(new URI("/uppercase")) + .accept(EVENT_STREAM).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class).getBody()) + .isEqualTo(sse("(FOO)", "(BAR)")); + } + + @Test + public void sum() throws Exception { + + LinkedMultiValueMap map = new LinkedMultiValueMap<>(); + + map.put("A", Arrays.asList("1", "2", "3")); + map.put("B", Arrays.asList("5", "6")); + + assertThat(rest.exchange( + RequestEntity.post(new URI("/sum")).accept(MediaType.APPLICATION_JSON) + .contentType(MediaType.APPLICATION_FORM_URLENCODED).body(map), + String.class).getBody()).isEqualTo("[{\"A\":6,\"B\":11}]"); + } + + @Test + public void count() throws Exception { + List list = Arrays.asList("A", "B", "A"); + assertThat(rest.exchange( + RequestEntity.post(new URI("/count")).accept(MediaType.APPLICATION_JSON) + .contentType(MediaType.APPLICATION_JSON).body(list), + String.class).getBody()).isEqualTo("{\"A\":2,\"B\":1}"); + } + + private String sse(String... values) { + return "data:" + StringUtils.arrayToDelimitedString(values, "\n\ndata:") + "\n\n"; + } + + @EnableAutoConfiguration + @TestConfiguration + public static class ApplicationConfiguration { + + private List list = new ArrayList<>(); + + public static void main(String[] args) throws Exception { + SpringApplication.run(HttpPostIntegrationTests.ApplicationConfiguration.class, + args); + } + + @Bean({ "uppercase", "transform", "post/more" }) + public Function, Flux> uppercase() { + return flux -> flux.log() + .map(value -> "(" + value.trim().toUpperCase() + ")"); + } + + @Bean + public Function bareUppercase() { + return value -> "(" + value.trim().toUpperCase() + ")"; + } + + @Bean + public Function, Message> messages() { + return value -> MessageBuilder + .withPayload("(" + value.getPayload().trim().toUpperCase() + ")") + .copyHeaders(value.getHeaders()).build(); + } + + @Bean + public Function>, Flux>> headers() { + return flux -> flux.map(value -> MessageBuilder + .withPayload("(" + value.getPayload().trim().toUpperCase() + ")") + .setHeader("foo", "bar").build()); + } + + @Bean + public Function, Flux> upFoos() { + return flux -> flux.log() + .map(value -> new Foo(value.getValue().trim().toUpperCase())); + } + + @Bean + public Function bareUpFoos() { + return value -> new Foo(value.getValue().trim().toUpperCase()); + } + + @Bean + public Function, Flux> wrap() { + return flux -> flux.log().map(value -> ".." + value + ".."); + } + + @Bean + public Function, Flux> doubler() { + return flux -> flux.log().map(value -> 2 * value); + } + + @Bean + public Function>, Flux>> maps() { + return flux -> flux.map(value -> { + value.put("value", value.get("value").trim().toUpperCase()); + return value; + }); + } + + @Bean + @Qualifier("foos") + public Function qualifier() { + return value -> new Foo("[" + value.trim().toUpperCase() + "]"); + } + + @Bean + public Consumer> updates() { + return flux -> flux.subscribe(value -> list.add(value)); + } + + @Bean + public Consumer> addFoos() { + return flux -> flux.subscribe(value -> list.add(value.getValue())); + } + + @Bean + public Consumer bareUpdates() { + return value -> list.add(value); + } + + @Bean("not/a") + public Function, Flux> function() { + return input -> Flux.just("bye"); + } + + @Bean + public Function, Map> sum() { + return valueMap -> valueMap.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, values -> values + .getValue().stream().mapToInt(Integer::parseInt).sum())); + } + + @Bean + public Function, Mono>> count() { + return flux -> flux.collect(HashMap::new, + (map, word) -> map.merge(word, 1, Integer::sum)); + } + } + + public static class Foo { + private String value; + + public Foo(String value) { + this.value = value; + } + + Foo() { + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + } + +} diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/PrefixTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/PrefixTests.java similarity index 87% rename from spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/PrefixTests.java rename to spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/PrefixTests.java index d0b36a0cd..d49ad1cee 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/PrefixTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/PrefixTests.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.cloud.function.web; +package org.springframework.cloud.function.web.flux; import java.net.URI; import java.util.function.Supplier; @@ -28,10 +28,13 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.cloud.function.web.RestApplication; +import org.springframework.cloud.function.web.flux.PrefixTests.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.http.HttpStatus; import org.springframework.http.RequestEntity; import org.springframework.http.ResponseEntity; +import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; import static org.assertj.core.api.Assertions.assertThat; @@ -44,7 +47,9 @@ import reactor.core.publisher.Flux; */ @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = { + "spring.main.web-application-type=reactive", "spring.cloud.function.web.path=/functions", "debug" }) +@ContextConfiguration(classes= {RestApplication.class, TestConfiguration.class}) public class PrefixTests { @LocalServerPort diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/SingletonTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/SingletonTests.java new file mode 100644 index 000000000..d0ce2006f --- /dev/null +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/SingletonTests.java @@ -0,0 +1,102 @@ +/* + * Copyright 2012-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.flux; + +import java.net.URI; +import java.util.function.Supplier; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.beans.factory.support.BeanDefinitionRegistry; +import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor; +import org.springframework.beans.factory.support.RootBeanDefinition; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.cloud.function.web.RestApplication; +import org.springframework.cloud.function.web.flux.SingletonTests.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.http.HttpStatus; +import org.springframework.http.RequestEntity; +import org.springframework.http.ResponseEntity; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +import reactor.core.publisher.Flux; + +/** + * @author Dave Syer + * + */ +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = "spring.main.web-application-type=reactive") +@ContextConfiguration(classes= {RestApplication.class, TestConfiguration.class}) +public class SingletonTests { + + @LocalServerPort + private int port; + @Autowired + private TestRestTemplate rest; + + @Test + public void words() throws Exception { + ResponseEntity result = rest + .exchange(RequestEntity.get(new URI("/words")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]"); + } + + @EnableAutoConfiguration + @org.springframework.boot.test.context.TestConfiguration + protected static class TestConfiguration { + @Bean + public static BeanDefinitionRegistryPostProcessor processor() { + return new BeanDefinitionRegistryPostProcessor() { + + @Override + public void postProcessBeanFactory( + ConfigurableListableBeanFactory beanFactory) + throws BeansException { + } + + @Override + public void postProcessBeanDefinitionRegistry( + BeanDefinitionRegistry registry) throws BeansException { + // Simulates what happens when you add a compiled function + RootBeanDefinition beanDefinition = new RootBeanDefinition( + MySupplier.class); + registry.registerBeanDefinition("words", beanDefinition); + } + }; + } + } + + static class MySupplier implements Supplier> { + @Override + public Flux get() { + return Flux.just("foo", "bar"); + } + } +} diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/DefaultRouteTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/DefaultRouteTests.java similarity index 88% rename from spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/DefaultRouteTests.java rename to spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/DefaultRouteTests.java index 84b03458f..4e07e9fa1 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/DefaultRouteTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/DefaultRouteTests.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.cloud.function.web; +package org.springframework.cloud.function.web.mvc; import java.net.URI; import java.util.function.Function; @@ -29,10 +29,13 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.cloud.function.web.RestApplication; +import org.springframework.cloud.function.web.mvc.DefaultRouteTests.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.http.HttpStatus; import org.springframework.http.RequestEntity; import org.springframework.http.ResponseEntity; +import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; import static org.assertj.core.api.Assertions.assertThat; @@ -45,6 +48,7 @@ import reactor.core.publisher.Flux; */ @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = "") +@ContextConfiguration(classes= {RestApplication.class, TestConfiguration.class}) public class DefaultRouteTests { @LocalServerPort diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/HeadersToMessageTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/HeadersToMessageTests.java new file mode 100644 index 000000000..967eb3cfb --- /dev/null +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/HeadersToMessageTests.java @@ -0,0 +1,82 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.function.web.mvc; + +import java.net.URI; +import java.util.Map; +import java.util.function.Function; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.cloud.function.web.RestApplication; +import org.springframework.cloud.function.web.mvc.HeadersToMessageTests.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.http.ResponseEntity; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * + * @author Oleg Zhurakousky + * + */ +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = { + "spring.main.web-application-type=servlet", "spring.cloud.function.web.path=/functions" }) +@ContextConfiguration(classes= {RestApplication.class, TestConfiguration.class}) +public class HeadersToMessageTests { + + @Autowired + private TestRestTemplate rest; + + @Test + public void testBodyAndCustomHeaderFromMessagePropagation() throws Exception { + ResponseEntity postForEntity = rest.postForEntity( + new URI("/functions/employee"), "{\"name\":\"Bob\",\"age\":25}", + String.class); + assertEquals("{\"name\":\"Bob\",\"age\":25}", postForEntity.getBody()); + assertTrue(postForEntity.getHeaders().containsKey("x-content-type")); + assertEquals("application/xml", + postForEntity.getHeaders().get("x-content-type").get(0)); + assertEquals("bar", postForEntity.getHeaders().get("foo").get(0)); + } + + @EnableAutoConfiguration + @org.springframework.boot.test.context.TestConfiguration + protected static class TestConfiguration { + @Bean({ "employee" }) + public Function>, Message>> function() { + return request -> { + Message> message = MessageBuilder + .withPayload(request.getPayload()) + .setHeader("X-Content-Type", "application/xml") + .setHeader("foo", "bar").build(); + return message; + }; + } + } +} diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/HttpGetIntegrationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/HttpGetIntegrationTests.java new file mode 100644 index 000000000..d1199b79b --- /dev/null +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/HttpGetIntegrationTests.java @@ -0,0 +1,350 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.function.web.mvc; + +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.cloud.function.web.RestApplication; +import org.springframework.cloud.function.web.mvc.HttpGetIntegrationTests.ApplicationConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.RequestEntity; +import org.springframework.http.ResponseEntity; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.MultiValueMap; +import org.springframework.util.StringUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +import reactor.core.publisher.Flux; + +/** + * @author Dave Syer + */ +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties="spring.main.web-application-type=servlet") +@ContextConfiguration(classes= {RestApplication.class, ApplicationConfiguration.class}) +public class HttpGetIntegrationTests { + + private static final MediaType EVENT_STREAM = MediaType.TEXT_EVENT_STREAM; + @LocalServerPort + private int port; + @Autowired + private TestRestTemplate rest; + @Autowired + private ApplicationConfiguration test; + + @Before + public void init() { + test.list.clear(); + } + + @Test + public void staticResource() { + assertThat(rest.getForObject("/test.html", String.class)).contains("Test"); + } + + @Test + public void wordsSSE() throws Exception { + assertThat(rest.exchange( + RequestEntity.get(new URI("/words")).accept(EVENT_STREAM).build(), + String.class).getBody()).isEqualTo(sse("foo", "bar")); + } + + @Test + public void wordsJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/words")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("[\"foo\",\"bar\"]"); + } + + @Test + @Ignore("Fix error handling") + public void errorJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/bang")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("[\"foo\"]"); + } + + @Test + public void words() throws Exception { + ResponseEntity result = rest + .exchange(RequestEntity.get(new URI("/words")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]"); + } + + @Test + public void word() throws Exception { + ResponseEntity result = rest + .exchange(RequestEntity.get(new URI("/word")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()).isEqualTo("foo"); + } + + @Test + public void foos() throws Exception { + ResponseEntity result = rest + .exchange(RequestEntity.get(new URI("/foos")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()) + .isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"); + } + + @Test + public void getMore() throws Exception { + ResponseEntity result = rest + .exchange(RequestEntity.get(new URI("/get/more")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]"); + } + + @Test + public void bareWords() throws Exception { + ResponseEntity result = rest + .exchange(RequestEntity.get(new URI("/bareWords")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]"); + } + + @Test + public void timeoutJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/timeout")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("[\"foo\"]"); + } + + @Test + public void emptyJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/empty")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("[]"); + } + + @Test + public void sentences() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/sentences")).build(), String.class) + .getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]"); + } + + @Test + public void sentencesAcceptAny() throws Exception { + assertThat(rest.exchange( + RequestEntity.get(new URI("/sentences")).accept(MediaType.ALL).build(), + String.class).getBody()) + .isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]"); + } + + @Test + public void sentencesAcceptJson() throws Exception { + ResponseEntity result = rest + .exchange( + RequestEntity.get(new URI("/sentences")) + .accept(MediaType.APPLICATION_JSON).build(), + String.class); + assertThat(result.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]"); + assertThat(result.getHeaders().getContentType()) + .isGreaterThanOrEqualTo(MediaType.APPLICATION_JSON); + } + + @Test + public void sentencesAcceptSse() throws Exception { + ResponseEntity result = rest.exchange( + RequestEntity.get(new URI("/sentences")).accept(EVENT_STREAM).build(), + String.class); + assertThat(result.getBody()) + .isEqualTo(sse("[\"go\",\"home\"]", "[\"come\",\"back\"]")); + assertThat(result.getHeaders().getContentType().isCompatibleWith(EVENT_STREAM)) + .isTrue(); + } + + @Test + public void postMoreFoo() { + assertThat(rest.getForObject("/post/more/foo", String.class)).isEqualTo("(FOO)"); + } + + @Test + public void uppercaseGet() { + assertThat(rest.getForObject("/uppercase/foo", String.class)).isEqualTo("(FOO)"); + } + + @Test + public void convertGet() { + assertThat(rest.getForObject("/wrap/123", String.class)).isEqualTo("..123.."); + } + + @Test + public void supplierFirst() { + assertThat(rest.getForObject("/not/a/function", String.class)) + .isEqualTo("[\"hello\"]"); + } + + @Test + public void convertGetJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/entity/321")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("{\"value\":321}"); + } + + private String sse(String... values) { + return "data:" + StringUtils.arrayToDelimitedString(values, "\n\ndata:") + "\n\n"; + } + + @EnableAutoConfiguration + @TestConfiguration + public static class ApplicationConfiguration { + + private List list = new ArrayList<>(); + + public static void main(String[] args) throws Exception { + SpringApplication.run(HttpGetIntegrationTests.ApplicationConfiguration.class, + args); + } + + @Bean({ "uppercase", "post/more" }) + public Function, Flux> uppercase() { + return flux -> flux.log() + .map(value -> "(" + value.trim().toUpperCase() + ")"); + } + + @Bean + public Function, Flux> wrap() { + return flux -> flux.log().map(value -> ".." + value + ".."); + } + + @Bean + public Function, Flux>> entity() { + return flux -> flux.log() + .map(value -> Collections.singletonMap("value", value)); + } + + @Bean({ "words", "get/more" }) + public Supplier> words() { + return () -> Flux.just("foo", "bar"); + } + + @Bean + public Supplier word() { + return () -> "foo"; + } + + @Bean + public Supplier> foos() { + return () -> Flux.just(new Foo("foo"), new Foo("bar")); + } + + @Bean + public Supplier> bareWords() { + return () -> Arrays.asList("foo", "bar"); + } + + @Bean + public Supplier> bang() { + return () -> Flux.fromArray(new String[] { "foo", "bar" }).map(value -> { + if (value.equals("bar")) { + throw new RuntimeException("Bar"); + } + return value; + }); + } + + @Bean + public Supplier> empty() { + return () -> Flux.fromIterable(Collections.emptyList()); + } + + @Bean("not/a/function") + public Supplier> supplier() { + return () -> Flux.just("hello"); + } + + @Bean("not/a") + public Function, Flux> function() { + return input -> Flux.just("bye"); + } + + @Bean + public Supplier> timeout() { + return () -> Flux.defer(() -> Flux.create(emitter -> { + emitter.next("foo"); + }).timeout(Duration.ofMillis(100L), Flux.empty())); + } + + @Bean + public Supplier>> sentences() { + return () -> Flux.just(Arrays.asList("go", "home"), + Arrays.asList("come", "back")); + } + + @Bean + public Function, Map> sum() { + return valueMap -> valueMap.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, values -> values + .getValue().stream().mapToInt(Integer::parseInt).sum())); + } + + } + + public static class Foo { + private String value; + + public Foo(String value) { + this.value = value; + } + + Foo() { + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + } + +} diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/HttpPostIntegrationTests.java similarity index 66% rename from spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java rename to spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/HttpPostIntegrationTests.java index 4bf6fcbad..2f250c4c5 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/HttpPostIntegrationTests.java @@ -13,19 +13,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.springframework.cloud.function.web; +package org.springframework.cloud.function.web.mvc; import java.net.URI; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.Supplier; import java.util.stream.Collectors; import org.junit.Before; @@ -42,6 +39,8 @@ import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.cloud.function.web.RestApplication; +import org.springframework.cloud.function.web.mvc.HttpPostIntegrationTests.ApplicationConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; @@ -49,6 +48,7 @@ import org.springframework.http.RequestEntity; import org.springframework.http.ResponseEntity; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -63,8 +63,9 @@ import reactor.core.publisher.Mono; * @author Dave Syer */ @RunWith(SpringRunner.class) -@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) -public class RestApplicationTests { +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties="spring.main.web-application-type=servlet") +@ContextConfiguration(classes= {RestApplication.class, ApplicationConfiguration.class}) +public class HttpPostIntegrationTests { private static final MediaType EVENT_STREAM = MediaType.TEXT_EVENT_STREAM; @LocalServerPort @@ -78,61 +79,6 @@ public class RestApplicationTests { public void init() { test.list.clear(); } - - @Test - public void staticResource() { - assertThat(rest.getForObject("/test.html", String.class)).contains("Test"); - } - - @Test - public void wordsSSE() throws Exception { - assertThat(rest.exchange( - RequestEntity.get(new URI("/words")).accept(EVENT_STREAM).build(), - String.class).getBody()).isEqualTo(sse("foo", "bar")); - } - - @Test - public void wordsJson() throws Exception { - assertThat(rest - .exchange(RequestEntity.get(new URI("/words")) - .accept(MediaType.APPLICATION_JSON).build(), String.class) - .getBody()).isEqualTo("[\"foo\",\"bar\"]"); - } - - @Test - @Ignore("Fix error handling") - public void errorJson() throws Exception { - assertThat(rest - .exchange(RequestEntity.get(new URI("/bang")) - .accept(MediaType.APPLICATION_JSON).build(), String.class) - .getBody()).isEqualTo("[\"foo\"]"); - } - - @Test - public void words() throws Exception { - ResponseEntity result = rest - .exchange(RequestEntity.get(new URI("/words")).build(), String.class); - assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); - assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]"); - } - - @Test - public void word() throws Exception { - ResponseEntity result = rest - .exchange(RequestEntity.get(new URI("/word")).build(), String.class); - assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); - assertThat(result.getBody()).isEqualTo("foo"); - } - - @Test - public void foos() throws Exception { - ResponseEntity result = rest - .exchange(RequestEntity.get(new URI("/foos")).build(), String.class); - assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); - assertThat(result.getBody()) - .isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"); - } - @Test public void qualifierFoos() throws Exception { ResponseEntity result = rest.exchange(RequestEntity.post(new URI("/foos")) @@ -144,29 +90,13 @@ public class RestApplicationTests { } @Test - public void getMore() throws Exception { - ResponseEntity result = rest - .exchange(RequestEntity.get(new URI("/get/more")).build(), String.class); - assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); - assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]"); - } - - @Test - public void bareWords() throws Exception { - ResponseEntity result = rest - .exchange(RequestEntity.get(new URI("/bareWords")).build(), String.class); - assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); - assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]"); - } - - @Test - @Ignore("Should this even work? Or do we need to be explicit about the JSON?") public void updates() throws Exception { ResponseEntity result = rest.exchange( - RequestEntity.post(new URI("/updates")).body("one\ntwo"), String.class); + RequestEntity.post(new URI("/updates")).body("[\"one\", \"two\"]"), + String.class); assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); assertThat(test.list).hasSize(2); - assertThat(result.getBody()).isEqualTo("onetwo"); + assertThat(result.getBody()).isNull(); } @Test @@ -176,7 +106,7 @@ public class RestApplicationTests { .body("[\"one\",\"two\"]"), String.class); assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); assertThat(test.list).hasSize(2); - assertThat(result.getBody()).isEqualTo("[\"one\",\"two\"]"); + assertThat(result.getBody()).isEqualTo(null); } @Test @@ -187,7 +117,7 @@ public class RestApplicationTests { assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); assertThat(test.list).hasSize(2); assertThat(result.getBody()) - .isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"); + .isEqualTo(null); } @Test @@ -200,60 +130,6 @@ public class RestApplicationTests { assertThat(result.getBody()).isEqualTo("[]"); } - @Test - public void timeoutJson() throws Exception { - assertThat(rest - .exchange(RequestEntity.get(new URI("/timeout")) - .accept(MediaType.APPLICATION_JSON).build(), String.class) - .getBody()).isEqualTo("[\"foo\"]"); - } - - @Test - public void emptyJson() throws Exception { - assertThat(rest - .exchange(RequestEntity.get(new URI("/empty")) - .accept(MediaType.APPLICATION_JSON).build(), String.class) - .getBody()).isEqualTo("[]"); - } - - @Test - public void sentences() throws Exception { - assertThat(rest - .exchange(RequestEntity.get(new URI("/sentences")).build(), String.class) - .getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]"); - } - - @Test - public void sentencesAcceptAny() throws Exception { - assertThat(rest.exchange( - RequestEntity.get(new URI("/sentences")).accept(MediaType.ALL).build(), - String.class).getBody()) - .isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]"); - } - - @Test - public void sentencesAcceptJson() throws Exception { - ResponseEntity result = rest - .exchange( - RequestEntity.get(new URI("/sentences")) - .accept(MediaType.APPLICATION_JSON).build(), - String.class); - assertThat(result.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]"); - assertThat(result.getHeaders().getContentType()) - .isGreaterThanOrEqualTo(MediaType.APPLICATION_JSON); - } - - @Test - public void sentencesAcceptSse() throws Exception { - ResponseEntity result = rest.exchange( - RequestEntity.get(new URI("/sentences")).accept(EVENT_STREAM).build(), - String.class); - assertThat(result.getBody()) - .isEqualTo(sse("[\"go\",\"home\"]", "[\"come\",\"back\"]")); - assertThat(result.getHeaders().getContentType().isCompatibleWith(EVENT_STREAM)) - .isTrue(); - } - @Test public void uppercase() throws Exception { ResponseEntity result = rest.exchange(RequestEntity @@ -263,10 +139,11 @@ public class RestApplicationTests { } @Test - // @Ignore("FIXME") public void messages() throws Exception { ResponseEntity result = rest.exchange(RequestEntity .post(new URI("/messages")).contentType(MediaType.APPLICATION_JSON) + // Remove this when Spring 5.0.8 is used + .accept(MediaType.valueOf("application/stream+json")) .header("x-foo", "bar").body("[\"foo\",\"bar\"]"), String.class); assertThat(result.getHeaders().getFirst("x-foo")).isEqualTo("bar"); assertThat(result.getHeaders()).doesNotContainKey("id"); @@ -277,10 +154,12 @@ public class RestApplicationTests { public void headers() throws Exception { ResponseEntity result = rest.exchange(RequestEntity .post(new URI("/headers")).contentType(MediaType.APPLICATION_JSON) + // Remove this when Spring 5.0.8 is used + .accept(MediaType.valueOf("application/stream+json")) .body("[\"foo\",\"bar\"]"), String.class); - assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]"); assertThat(result.getHeaders().getFirst("foo")).isEqualTo("bar"); assertThat(result.getHeaders()).doesNotContainKey("id"); + assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]"); } @Test @@ -290,7 +169,8 @@ public class RestApplicationTests { RequestEntity.post(new URI("/uppercase")) .contentType(MediaType.TEXT_PLAIN).body("foo"), String.class); - assertThat(result.getBody()).isEqualTo("(FOO)"); + // Result is multi-valued so it has to come out as an array + assertThat(result.getBody()).isEqualTo("[\"(FOO)\"]"); } @Test @@ -374,26 +254,12 @@ public class RestApplicationTests { assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]"); } - @Test - public void postMoreFoo() { - assertThat(rest.getForObject("/post/more/foo", String.class)).isEqualTo("(FOO)"); - } - - @Test - public void uppercaseGet() { - assertThat(rest.getForObject("/uppercase/foo", String.class)).isEqualTo("(FOO)"); - } - - @Test - public void convertGet() { - assertThat(rest.getForObject("/wrap/123", String.class)).isEqualTo("..123.."); - } - @Test public void convertPost() throws Exception { ResponseEntity result = rest.exchange(RequestEntity.post(new URI("/wrap")) .contentType(MediaType.TEXT_PLAIN).body("123"), String.class); - assertThat(result.getBody()).isEqualTo("..123.."); + // Result is multi-valued so it has to come out as an array + assertThat(result.getBody()).isEqualTo("[\"..123..\"]"); } @Test @@ -408,20 +274,6 @@ public class RestApplicationTests { assertThat(result.getBody()).isEqualTo("[246]"); } - @Test - public void supplierFirst() { - assertThat(rest.getForObject("/not/a/function", String.class)) - .isEqualTo("[\"hello\"]"); - } - - @Test - public void convertGetJson() throws Exception { - assertThat(rest - .exchange(RequestEntity.get(new URI("/entity/321")) - .accept(MediaType.APPLICATION_JSON).build(), String.class) - .getBody()).isEqualTo("{\"value\":321}"); - } - @Test public void uppercaseJsonArray() throws Exception { assertThat(rest.exchange( @@ -475,7 +327,7 @@ public class RestApplicationTests { private List list = new ArrayList<>(); public static void main(String[] args) throws Exception { - SpringApplication.run(RestApplicationTests.ApplicationConfiguration.class, + SpringApplication.run(HttpPostIntegrationTests.ApplicationConfiguration.class, args); } @@ -525,12 +377,6 @@ public class RestApplicationTests { return flux -> flux.log().map(value -> 2 * value); } - @Bean - public Function, Flux>> entity() { - return flux -> flux.log() - .map(value -> Collections.singletonMap("value", value)); - } - @Bean public Function>, Flux>> maps() { return flux -> flux.map(value -> { @@ -539,32 +385,12 @@ public class RestApplicationTests { }); } - @Bean({ "words", "get/more" }) - public Supplier> words() { - return () -> Flux.just("foo", "bar"); - } - - @Bean - public Supplier word() { - return () -> "foo"; - } - - @Bean - public Supplier> foos() { - return () -> Flux.just(new Foo("foo"), new Foo("bar")); - } - @Bean @Qualifier("foos") public Function qualifier() { return value -> new Foo("[" + value.trim().toUpperCase() + "]"); } - @Bean - public Supplier> bareWords() { - return () -> Arrays.asList("foo", "bar"); - } - @Bean public Consumer> updates() { return flux -> flux.subscribe(value -> list.add(value)); @@ -580,44 +406,11 @@ public class RestApplicationTests { return value -> list.add(value); } - @Bean - public Supplier> bang() { - return () -> Flux.fromArray(new String[] { "foo", "bar" }).map(value -> { - if (value.equals("bar")) { - throw new RuntimeException("Bar"); - } - return value; - }); - } - - @Bean - public Supplier> empty() { - return () -> Flux.fromIterable(Collections.emptyList()); - } - - @Bean("not/a/function") - public Supplier> supplier() { - return () -> Flux.just("hello"); - } - @Bean("not/a") public Function, Flux> function() { return input -> Flux.just("bye"); } - @Bean - public Supplier> timeout() { - return () -> Flux.defer(() -> Flux.create(emitter -> { - emitter.next("foo"); - }).timeout(Duration.ofMillis(100L), Flux.empty())); - } - - @Bean - public Supplier>> sentences() { - return () -> Flux.just(Arrays.asList("go", "home"), - Arrays.asList("come", "back")); - } - @Bean public Function, Map> sum() { return valueMap -> valueMap.entrySet().stream() diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/PrefixTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/PrefixTests.java new file mode 100644 index 000000000..5437d40c9 --- /dev/null +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/PrefixTests.java @@ -0,0 +1,83 @@ +/* + * Copyright 2012-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.mvc; + +import java.net.URI; +import java.util.function.Supplier; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.cloud.function.web.RestApplication; +import org.springframework.cloud.function.web.mvc.PrefixTests.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.http.HttpStatus; +import org.springframework.http.RequestEntity; +import org.springframework.http.ResponseEntity; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +import reactor.core.publisher.Flux; + +/** + * @author Dave Syer + * + */ +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = { + "spring.main.web-application-type=servlet", + "spring.cloud.function.web.path=/functions" }) +@ContextConfiguration(classes= {RestApplication.class, TestConfiguration.class}) +public class PrefixTests { + + @LocalServerPort + private int port; + @Autowired + private TestRestTemplate rest; + + @Test + public void words() throws Exception { + ResponseEntity result = rest.exchange( + RequestEntity.get(new URI("/functions/words")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]"); + } + + @Test + public void missing() throws Exception { + ResponseEntity result = rest + .exchange(RequestEntity.get(new URI("/words")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND); + } + + @EnableAutoConfiguration + @org.springframework.boot.test.context.TestConfiguration + protected static class TestConfiguration { + @Bean({ "words", "get/more" }) + public Supplier> words() { + return () -> Flux.fromArray(new String[] { "foo", "bar" }); + } + } +} diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/SingletonTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/SingletonTests.java similarity index 90% rename from spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/SingletonTests.java rename to spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/SingletonTests.java index c56e23c6b..4cc81995b 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/SingletonTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/SingletonTests.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.cloud.function.web; +package org.springframework.cloud.function.web.mvc; import java.net.URI; import java.util.function.Supplier; @@ -33,10 +33,13 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.cloud.function.web.RestApplication; +import org.springframework.cloud.function.web.mvc.SingletonTests.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.http.HttpStatus; import org.springframework.http.RequestEntity; import org.springframework.http.ResponseEntity; +import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; import static org.assertj.core.api.Assertions.assertThat; @@ -49,6 +52,7 @@ import reactor.core.publisher.Flux; */ @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) +@ContextConfiguration(classes= {RestApplication.class, TestConfiguration.class}) public class SingletonTests { @LocalServerPort diff --git a/spring-cloud-starter-function-web/src/main/resources/META-INF/spring.provides b/spring-cloud-starter-function-web/src/main/resources/META-INF/spring.provides index 15c50227c..d7c921aae 100644 --- a/spring-cloud-starter-function-web/src/main/resources/META-INF/spring.provides +++ b/spring-cloud-starter-function-web/src/main/resources/META-INF/spring.provides @@ -1 +1 @@ -provides: spring-cloud-security \ No newline at end of file +provides: spring-cloud-function-context \ No newline at end of file diff --git a/spring-cloud-starter-function-webflux/pom.xml b/spring-cloud-starter-function-webflux/pom.xml new file mode 100644 index 000000000..727288c30 --- /dev/null +++ b/spring-cloud-starter-function-webflux/pom.xml @@ -0,0 +1,29 @@ + + + 4.0.0 + + org.springframework.cloud + spring-cloud-function-parent + 2.0.0.BUILD-SNAPSHOT + .. + + spring-cloud-starter-function-webflux + spring-cloud-starter-function-webflux + Spring Cloud Starter + https://projects.spring.io/spring-cloud + + Pivotal Software, Inc. + https://www.spring.io + + + + org.springframework.cloud + spring-cloud-function-web + + + org.springframework.boot + spring-boot-starter-webflux + + + diff --git a/spring-cloud-starter-function-webflux/src/main/resources/META-INF/spring.provides b/spring-cloud-starter-function-webflux/src/main/resources/META-INF/spring.provides new file mode 100644 index 000000000..d7c921aae --- /dev/null +++ b/spring-cloud-starter-function-webflux/src/main/resources/META-INF/spring.provides @@ -0,0 +1 @@ +provides: spring-cloud-function-context \ No newline at end of file