Add support for webflux

MVC and Webflux share some common business logic
so it makes sense to put them in the same module.
Also simplifies the MVC code, removing the custom argument
and return value handlers.

Weed out unecessary method param

Unignore some tests
This commit is contained in:
Dave Syer
2018-07-30 15:10:41 +01:00
committed by Oleg Zhurakousky
parent ed14474b9f
commit 7dd38edf84
37 changed files with 2716 additions and 1353 deletions

View File

@@ -0,0 +1,317 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.cloud.function.web.util.HeaderUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.http.ResponseEntity.BodyBuilder;
import org.springframework.messaging.Message;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* @author Dave Syer
*
*/
public class RequestProcessor {
private static Log logger = LogFactory.getLog(RequestProcessor.class);
private FunctionInspector inspector;
private StringConverter converter;
private JsonMapper mapper;
@Value("${debug:${DEBUG:false}}")
private String debug = "false";
public RequestProcessor(JsonMapper mapper, FunctionInspector inspector,
StringConverter converter) {
this.mapper = mapper;
this.inspector = inspector;
this.converter = converter;
}
public static FunctionWrapper wrapper(Function<Publisher<?>, Publisher<?>> function,
Consumer<Publisher<?>> consumer, Supplier<Publisher<?>> supplier) {
return new FunctionWrapper(function, consumer, supplier);
}
public static class FunctionWrapper {
private Function<Publisher<?>, Publisher<?>> function;
private Consumer<Publisher<?>> consumer;
private Supplier<Publisher<?>> supplier;
private MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
private HttpHeaders headers = new HttpHeaders();
private String argument;
public FunctionWrapper(Function<Publisher<?>, Publisher<?>> function,
Consumer<Publisher<?>> consumer, Supplier<Publisher<?>> supplier) {
this.function = function;
this.consumer = consumer;
this.supplier = supplier;
}
public Object handler() {
return function != null ? function : consumer != null ? consumer : supplier;
}
public Function<Publisher<?>, Publisher<?>> function() {
return this.function;
}
public Consumer<Publisher<?>> consumer() {
return this.consumer;
}
public Supplier<Publisher<?>> supplier() {
return this.supplier;
}
public MultiValueMap<String, String> params() {
return params;
}
public HttpHeaders headers() {
return this.headers;
}
public FunctionWrapper headers(HttpHeaders headers) {
this.headers = headers;
return this;
}
public FunctionWrapper params(MultiValueMap<String, String> params) {
this.params.addAll(params);
return this;
}
public FunctionWrapper argument(String argument) {
this.argument = argument;
return this;
}
public String argument() {
return this.argument;
}
}
public Mono<ResponseEntity<?>> post(FunctionWrapper wrapper, String body,
boolean stream) {
Object function = wrapper.handler();
if (!StringUtils.hasText(body)) {
return post(wrapper, (List<?>) null, null, stream);
}
body = body.trim();
Object input;
Class<?> inputType = inspector.getInputType(function);
if (body.startsWith("[")) {
input = mapper.toList(body, inputType);
}
else {
if (inputType == String.class) {
input = body;
}
else if (body.startsWith("{")) {
input = mapper.toSingle(body, inputType);
}
else if (body.startsWith("\"")) {
input = body.substring(1, body.length() - 2);
}
else {
input = converter.convert(function, body);
}
}
if (input instanceof List) {
return post(wrapper, (List<?>) input, null, stream);
}
return post(wrapper, Collections.singletonList(input), null, stream);
}
private Mono<ResponseEntity<?>> post(FunctionWrapper wrapper, List<?> body,
MultiValueMap<String, String> params, boolean stream) {
Function<Publisher<?>, Publisher<?>> function = wrapper.function();
Consumer<Publisher<?>> consumer = wrapper.consumer();
MultiValueMap<String, String> form = wrapper.params();
if (params != null) {
form.putAll(params);
}
Flux<?> flux = body == null ? Flux.just(form) : Flux.fromIterable(body);
if (inspector.isMessage(function)) {
flux = messages(wrapper, function == null ? consumer : function, flux);
}
if (function != null) {
Flux<?> result = Flux.from(function.apply(flux));
if (logger.isDebugEnabled()) {
logger.debug("Handled POST with function");
}
if (stream) {
return stream(wrapper, result);
}
return response(function, result, body == null ? null : body.size()<=1, false);
}
if (consumer != null) {
consumer.accept(flux);
if (logger.isDebugEnabled()) {
logger.debug("Handled POST with consumer");
}
return Mono.just(ResponseEntity.status(HttpStatus.ACCEPTED).build());
}
throw new IllegalArgumentException("no such function");
}
private Flux<?> messages(FunctionWrapper request, Object function, Flux<?> flux) {
Map<String, Object> headers = HeaderUtils.fromHttp(request.headers());
flux = flux.map(payload -> MessageUtils.create(function, payload, headers));
return flux;
}
private void addHeaders(BodyBuilder builder, Message<?> message) {
HttpHeaders headers = new HttpHeaders();
builder.headers(HeaderUtils.fromMessage(message.getHeaders(), headers));
}
public Mono<ResponseEntity<?>> stream(FunctionWrapper request) {
Publisher<?> result;
if (request.function()!=null) {
result = value(request.function(), request.argument());
} else {
result = supplier(request.supplier());
}
return stream(request, result);
}
private Mono<ResponseEntity<?>> stream(FunctionWrapper request, Publisher<?> result) {
BodyBuilder builder = ResponseEntity.ok();
if (inspector.isMessage(request.handler())) {
result = Flux.from(result)
.doOnNext(value -> addHeaders(builder, (Message<?>) value))
.map(message -> MessageUtils.unpack(request.handler(), message)
.getPayload());
}
Publisher<?> output = result;
return Flux.from(output).then(Mono.fromSupplier(() -> builder.body(output)));
}
private Mono<ResponseEntity<?>> response(Object handler, Publisher<?> result,
Boolean single, boolean getter) {
BodyBuilder builder = ResponseEntity.ok();
if (inspector.isMessage(handler)) {
result = Flux.from(result)
.doOnNext(value -> addHeaders(builder, (Message<?>) value))
.map(message -> MessageUtils.unpack(handler, message).getPayload());
}
if (single != null && single && isOutputSingle(handler)) {
result = Mono.from(result);
}
else if (getter && single == null && isOutputSingle(handler)) {
result = Mono.from(result);
}
else if (isInputMultiple(handler) && isOutputSingle(handler)) {
result = Mono.from(result);
}
Publisher<?> output = result;
if (output instanceof Mono) {
return Mono.from(output).flatMap(body -> Mono.just(builder.body(body)));
}
return Flux.from(output).collectList()
.flatMap(body -> Mono.just(builder.body(body)));
}
private boolean isInputMultiple(Object handler) {
Class<?> type = inspector.getInputType(handler);
Class<?> wrapper = inspector.getInputWrapper(handler);
return Collection.class.isAssignableFrom(type) || Flux.class.equals(wrapper);
}
private boolean isOutputSingle(Object handler) {
Class<?> type = inspector.getOutputType(handler);
Class<?> wrapper = inspector.getOutputWrapper(handler);
if (Stream.class.isAssignableFrom(type)) {
return false;
}
if (wrapper == type) {
return true;
}
return Mono.class.equals(wrapper) || Optional.class.equals(wrapper);
}
private Publisher<?> supplier(Supplier<Publisher<?>> supplier) {
Publisher<?> result = supplier.get();
return result;
}
private Mono<?> value(Function<Publisher<?>, Publisher<?>> function, String value) {
Object input = converter.convert(function, value);
Mono<?> result = Mono.from(function.apply(Flux.just(input)));
return result;
}
public Mono<ResponseEntity<?>> get(FunctionWrapper wrapper) {
if (wrapper.function() != null) {
return response(wrapper.function(), value(wrapper.function(), wrapper.argument()), true, true);
}
else {
return response(wrapper.supplier(), supplier(wrapper.supplier()), null, true);
}
}
}

View File

@@ -17,12 +17,14 @@
package org.springframework.cloud.function.web;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
/**
* @author Mark Fisher
*/
@SpringBootApplication
@SpringBootConfiguration
@EnableAutoConfiguration
public class RestApplication {
public static void main(String[] args) {

View File

@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux;
package org.springframework.cloud.function.web;
/**
* @author Dave Syer

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux.constants;
package org.springframework.cloud.function.web.constants;
/**
* Common storage for web request attribute names (in a separate package to avoid cycles).
@@ -33,9 +33,5 @@ public abstract class WebRequestConstants {
+ ".argument";
public static final String HANDLER = WebRequestConstants.class.getName()
+ ".handler";
public static final String INPUT_SINGLE = WebRequestConstants.class.getName()
+ ".input_single";
public static final String OUTPUT_SINGLE = WebRequestConstants.class.getName()
+ ".output_single";
}

View File

@@ -16,32 +16,24 @@
package org.springframework.cloud.function.web.flux;
import java.util.Collection;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.web.flux.constants.WebRequestConstants;
import org.springframework.cloud.function.web.flux.request.FluxFormRequest;
import org.springframework.cloud.function.web.flux.request.FluxRequest;
import org.springframework.http.HttpStatus;
import org.springframework.cloud.function.web.RequestProcessor;
import org.springframework.cloud.function.web.RequestProcessor.FunctionWrapper;
import org.springframework.cloud.function.web.constants.WebRequestConstants;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.context.request.WebRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -51,149 +43,65 @@ import reactor.core.publisher.Mono;
@Component
public class FunctionController {
private static Log logger = LogFactory.getLog(FunctionController.class);
private RequestProcessor processor;
private FunctionInspector inspector;
private boolean debug = false;
private StringConverter converter;
public FunctionController(FunctionInspector inspector, StringConverter converter) {
this.inspector = inspector;
this.converter = converter;
public FunctionController(RequestProcessor processor) {
this.processor = processor;
}
public void setDebug(boolean debug) {
this.debug = debug;
@PostMapping(path = "/**", consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE)
@ResponseBody
public Mono<ResponseEntity<?>> form(ServerWebExchange request) {
FunctionWrapper wrapper = wrapper(request);
return request.getFormData().doOnSuccess(params -> wrapper.params(params))
.then(processor.post(wrapper, null, false));
}
@PostMapping(path = "/**")
@ResponseBody
public ResponseEntity<Publisher<?>> post(WebRequest request,
@RequestBody FluxRequest<?> body) {
@SuppressWarnings("unchecked")
Function<Publisher<?>, Publisher<?>> function = (Function<Publisher<?>, Publisher<?>>) request
.getAttribute(WebRequestConstants.FUNCTION, WebRequest.SCOPE_REQUEST);
@SuppressWarnings("unchecked")
Consumer<Publisher<?>> consumer = (Consumer<Publisher<?>>) request
.getAttribute(WebRequestConstants.CONSUMER, WebRequest.SCOPE_REQUEST);
Boolean single = (Boolean) request.getAttribute(WebRequestConstants.INPUT_SINGLE,
WebRequest.SCOPE_REQUEST);
FluxFormRequest<?, ?> form = FluxFormRequest.from(request.getParameterMap());
if (function != null) {
Flux<?> flux = body.body() == null ? form.flux() : body.flux();
if (debug) {
flux = flux.log();
}
Flux<?> result = Flux.from(function.apply(flux));
if (inspector.isMessage(function)) {
result = result.map(message -> MessageUtils.unpack(function, message));
}
if (logger.isDebugEnabled()) {
logger.debug("Handled POST with function");
}
return ResponseEntity.ok().body(response(request, function, single, result));
}
if (consumer != null) {
Flux<?> flux = body.body() == null ? form.flux().cache()
: body.flux().cache(); // send a copy back to the caller
if (debug) {
flux = flux.log();
}
consumer.accept(flux);
if (logger.isDebugEnabled()) {
logger.debug("Handled POST with consumer");
}
return ResponseEntity.status(HttpStatus.ACCEPTED).body(flux);
}
throw new IllegalArgumentException("no such function");
public Mono<ResponseEntity<?>> post(ServerWebExchange request,
@RequestBody(required = false) String body) {
FunctionWrapper wrapper = wrapper(request);
return processor.post(wrapper, body, false);
}
private Publisher<?> response(WebRequest request, Object handler, Boolean single,
Publisher<?> result) {
if (single != null && single && isOutputSingle(handler)) {
request.setAttribute(WebRequestConstants.OUTPUT_SINGLE, true,
WebRequest.SCOPE_REQUEST);
return Mono.from(result);
}
if (isInputMultiple(handler) && isOutputSingle(handler)) {
request.setAttribute(WebRequestConstants.OUTPUT_SINGLE, true,
WebRequest.SCOPE_REQUEST);
return Mono.from(result);
}
request.setAttribute(WebRequestConstants.OUTPUT_SINGLE, false,
WebRequest.SCOPE_REQUEST);
return result;
}
private boolean isInputMultiple(Object handler) {
Class<?> type = inspector.getInputType(handler);
Class<?> wrapper = inspector.getInputWrapper(handler);
return Collection.class.isAssignableFrom(type) || Flux.class.equals(wrapper);
}
private boolean isOutputSingle(Object handler) {
Class<?> type = inspector.getOutputType(handler);
Class<?> wrapper = inspector.getOutputWrapper(handler);
if (Stream.class.isAssignableFrom(type)) {
return false;
}
if (wrapper == type) {
return true;
}
return Mono.class.equals(wrapper) || Optional.class.equals(wrapper);
@PostMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Mono<ResponseEntity<?>> postStream(ServerWebExchange request,
@RequestBody(required = false) String body) {
FunctionWrapper wrapper = wrapper(request);
return processor.post(wrapper, body, true);
}
@GetMapping(path = "/**")
@ResponseBody
public ResponseEntity<Publisher<?>> get(WebRequest request) {
public Mono<ResponseEntity<?>> get(ServerWebExchange request) {
FunctionWrapper wrapper = wrapper(request);
return processor.get(wrapper);
}
@GetMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Mono<ResponseEntity<?>> getStream(ServerWebExchange request) {
FunctionWrapper wrapper = wrapper(request);
return processor.stream(wrapper);
}
private FunctionWrapper wrapper(ServerWebExchange request) {
@SuppressWarnings("unchecked")
Function<Publisher<?>, Publisher<?>> function = (Function<Publisher<?>, Publisher<?>>) request
.getAttribute(WebRequestConstants.FUNCTION, WebRequest.SCOPE_REQUEST);
.getAttribute(WebRequestConstants.FUNCTION);
@SuppressWarnings("unchecked")
Consumer<Publisher<?>> consumer = (Consumer<Publisher<?>>) request
.getAttribute(WebRequestConstants.CONSUMER);
@SuppressWarnings("unchecked")
Supplier<Publisher<?>> supplier = (Supplier<Publisher<?>>) request
.getAttribute(WebRequestConstants.SUPPLIER, WebRequest.SCOPE_REQUEST);
String argument = (String) request.getAttribute(WebRequestConstants.ARGUMENT,
WebRequest.SCOPE_REQUEST);
Publisher<?> result;
if (function != null) {
result = value(function, argument);
}
else {
result = response(request, supplier, true, supplier(supplier));
}
if (inspector.isMessage(function)) {
result = Flux.from(result)
.map(message -> MessageUtils.unpack(function, message));
}
return ResponseEntity.ok().body(result);
}
private Publisher<?> supplier(Supplier<Publisher<?>> supplier) {
Publisher<?> result = supplier.get();
if (logger.isDebugEnabled()) {
logger.debug("Handled GET with supplier");
}
return debug ? Flux.from(result).log() : result;
}
private Mono<?> value(Function<Publisher<?>, Publisher<?>> function, String value) {
Object input = converter.convert(function, value);
Mono<?> result = Mono.from(function.apply(Flux.just(input)));
if (logger.isDebugEnabled()) {
logger.debug("Handled GET with function");
}
return debug ? result.log() : result;
.getAttribute(WebRequestConstants.SUPPLIER);
FunctionWrapper wrapper = RequestProcessor.wrapper(function, consumer, supplier);
wrapper.headers(request.getRequest().getHeaders());
wrapper.params(request.getRequest().getQueryParams());
String argument = (String) request.getAttribute(WebRequestConstants.ARGUMENT);
wrapper.argument(argument);
return wrapper;
}
}

View File

@@ -20,8 +20,6 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.servlet.http.HttpServletRequest;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.InitializingBean;
@@ -29,12 +27,15 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.web.flux.constants.WebRequestConstants;
import org.springframework.cloud.function.web.constants.WebRequestConstants;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.util.StringUtils;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.HandlerMapping;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping;
import org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
/**
* @author Dave Syer
@@ -52,9 +53,6 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping
@Value("${spring.cloud.function.web.path:}")
private String prefix = "";
@Value("${debug:${DEBUG:false}}")
private String debug = "false";
@Autowired
public FunctionHandlerMapping(FunctionCatalog catalog,
FunctionController controller) {
@@ -67,7 +65,7 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping
@Override
public void afterPropertiesSet() {
super.afterPropertiesSet();
this.controller.setDebug(!"false".equals(debug));
// this.controller.setDebug(!"false".equals(debug));
detectHandlerMethods(controller);
while (prefix.endsWith("/")) {
prefix = prefix.substring(0, prefix.length() - 1);
@@ -79,68 +77,58 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping
}
@Override
protected HandlerMethod getHandlerInternal(HttpServletRequest request)
throws Exception {
HandlerMethod handler = super.getHandlerInternal(request);
if (handler == null) {
return null;
}
String path = (String) request
.getAttribute(HandlerMapping.PATH_WITHIN_HANDLER_MAPPING_ATTRIBUTE);
public Mono<HandlerMethod> getHandlerInternal(ServerWebExchange request) {
String path = request.getRequest().getPath().pathWithinApplication().value();
if (StringUtils.hasText(prefix) && !path.startsWith(prefix)) {
return null;
return Mono.empty();
}
Mono<HandlerMethod> handler = super.getHandlerInternal(request);
if (path == null) {
return handler;
}
if (path.startsWith(prefix)) {
path = path.substring(prefix.length());
}
if (path == null) {
return handler;
}
Object function = findFunctionForGet(request, path);
if (function != null) {
if (logger.isDebugEnabled()) {
logger.debug("Found function for GET: " + path);
}
request.setAttribute(WebRequestConstants.HANDLER, function);
return handler;
if (function == null) {
function = findFunctionForPost(request, path);
}
function = findFunctionForPost(request, path);
if (function != null) {
if (logger.isDebugEnabled()) {
logger.debug("Found function for POST: " + path);
}
request.setAttribute(WebRequestConstants.HANDLER, function);
return handler;
request.getAttributes().put(WebRequestConstants.HANDLER, function);
}
return null;
Object actual = function;
return handler.filter(method -> actual != null);
}
private Object findFunctionForPost(HttpServletRequest request, String path) {
if (!request.getMethod().equals("POST")) {
private Object findFunctionForPost(ServerWebExchange request, String path) {
if (!request.getRequest().getMethod().equals(HttpMethod.POST)) {
return null;
}
path = path.startsWith("/") ? path.substring(1) : path;
Consumer<Publisher<?>> consumer = functions.lookup(Consumer.class, path);
if (consumer != null) {
request.setAttribute(WebRequestConstants.CONSUMER, consumer);
request.getAttributes().put(WebRequestConstants.CONSUMER, consumer);
return consumer;
}
Function<Object, Object> function = functions.lookup(Function.class, path);
if (function != null) {
request.setAttribute(WebRequestConstants.FUNCTION, function);
request.getAttributes().put(WebRequestConstants.FUNCTION, function);
return function;
}
return null;
}
private Object findFunctionForGet(HttpServletRequest request, String path) {
if (!request.getMethod().equals("GET")) {
private Object findFunctionForGet(ServerWebExchange request, String path) {
if (!request.getRequest().getMethod().equals(HttpMethod.GET)) {
return null;
}
path = path.startsWith("/") ? path.substring(1) : path;
Supplier<Publisher<?>> supplier = functions.lookup(Supplier.class, path);
if (supplier != null) {
request.setAttribute(WebRequestConstants.SUPPLIER, supplier);
request.getAttributes().put(WebRequestConstants.SUPPLIER, supplier);
return supplier;
}
StringBuilder builder = new StringBuilder();
@@ -156,8 +144,8 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping
: null;
Function<Object, Object> function = functions.lookup(Function.class, name);
if (function != null) {
request.setAttribute(WebRequestConstants.FUNCTION, function);
request.setAttribute(WebRequestConstants.ARGUMENT, value);
request.getAttributes().put(WebRequestConstants.FUNCTION, function);
request.getAttributes().put(WebRequestConstants.ARGUMENT, value);
return function;
}
}

View File

@@ -16,34 +16,24 @@
package org.springframework.cloud.function.web.flux;
import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication.Type;
import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration;
import org.springframework.boot.autoconfigure.http.HttpMessageConverters;
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.cloud.function.web.flux.request.FluxHandlerMethodArgumentResolver;
import org.springframework.cloud.function.web.flux.response.FluxReturnValueHandler;
import org.springframework.context.ApplicationContext;
import org.springframework.cloud.function.web.RequestProcessor;
import org.springframework.cloud.function.web.StringConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler;
import org.springframework.web.method.support.HandlerMethodArgumentResolver;
import org.springframework.web.method.support.HandlerMethodReturnValueHandler;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter;
import reactor.core.publisher.Flux;
@@ -53,15 +43,12 @@ import reactor.core.publisher.Flux;
* @author Oleg Zhurakousky
*/
@Configuration
@ConditionalOnWebApplication
@ConditionalOnClass({ Flux.class, AsyncHandlerMethodReturnValueHandler.class })
@Import(FunctionController.class)
@ConditionalOnWebApplication(type=Type.REACTIVE)
@Import({FunctionController.class, RequestProcessor.class})
@AutoConfigureAfter({ JacksonAutoConfiguration.class, GsonAutoConfiguration.class })
public class ReactorAutoConfiguration {
@Autowired
private ApplicationContext context;
@Bean
public FunctionHandlerMapping functionHandlerMapping(FunctionCatalog catalog,
FunctionController controller) {
@@ -75,44 +62,6 @@ public class ReactorAutoConfiguration {
return new BasicStringConverter(inspector, beanFactory);
}
@Configuration
protected static class FluxReturnValueConfiguration {
@Bean
public FluxReturnValueHandler fluxReturnValueHandler(FunctionInspector inspector,
HttpMessageConverters converters) {
return new FluxReturnValueHandler(inspector, converters.getConverters());
}
}
@Configuration
protected static class FluxArgumentResolverConfiguration {
@Bean
public FluxHandlerMethodArgumentResolver fluxHandlerMethodArgumentResolver(
FunctionInspector inspector, JsonMapper mapper) {
return new FluxHandlerMethodArgumentResolver(inspector, mapper);
}
}
@Bean
public SmartInitializingSingleton fluxRequestMappingHandlerAdapterProcessor(
RequestMappingHandlerAdapter adapter,
FluxHandlerMethodArgumentResolver resolver) {
return new SmartInitializingSingleton() {
@Override
public void afterSingletonsInstantiated() {
List<HandlerMethodArgumentResolver> resolvers = new ArrayList<>(
adapter.getArgumentResolvers());
resolvers.add(0, resolver);
adapter.setArgumentResolvers(resolvers);
List<HandlerMethodReturnValueHandler> handlers = new ArrayList<>(
adapter.getReturnValueHandlers());
handlers.add(0, context.getBean(FluxReturnValueHandler.class));
adapter.setReturnValueHandlers(handlers);
}
};
}
private static class BasicStringConverter implements StringConverter {

View File

@@ -1,53 +0,0 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux.request;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
public abstract class DelegateHandler<T> {
private final ListableBeanFactory factory;
private FunctionInspector processor;
private Object handler;
private final Object source;
public DelegateHandler(ListableBeanFactory factory, Object source) {
this.factory = factory;
this.source = source;
}
public Class<?> type() {
return processor().getInputType(handler());
}
private Object handler() {
if (handler == null) {
handler = source instanceof String ? factory.getBean((String) source)
: source;
}
return handler;
}
private FunctionInspector processor() {
if (processor == null) {
processor = factory.getBean(FunctionInspector.class);
}
return processor;
}
}

View File

@@ -1,41 +0,0 @@
package org.springframework.cloud.function.web.flux.request;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import reactor.core.publisher.Flux;
import java.util.Arrays;
import java.util.Map;
public class FluxFormRequest<K, V> {
private Map<K, V[]> map;
public FluxFormRequest(Map<K, V[]> map) {
this.map = map;
}
public static <K, V> FluxFormRequest<K, V> from(Map<K, V[]> map) {
return new FluxFormRequest<>(map);
}
public Flux<MultiValueMap<K, V>> flux() {
return Flux.just(buildMap());
}
public MultiValueMap<K, V> body() {
return buildMap();
}
private MultiValueMap<K, V> buildMap() {
if (map == null)
return null;
MultiValueMap<K, V> result = new LinkedMultiValueMap<>();
map.forEach((key, values) -> result.put(key, Arrays.asList(values)));
return result;
}
}

View File

@@ -1,137 +0,0 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux.request;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.cloud.function.web.flux.constants.WebRequestConstants;
import org.springframework.cloud.function.web.util.HeaderUtils;
import org.springframework.core.MethodParameter;
import org.springframework.core.Ordered;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.StreamUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.support.WebDataBinderFactory;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.method.support.HandlerMethodArgumentResolver;
import org.springframework.web.method.support.ModelAndViewContainer;
import org.springframework.web.util.ContentCachingRequestWrapper;
/**
* Converter for request bodies of type <code>Flux<String></code>.
*
* @author Dave Syer
*
*/
public class FluxHandlerMethodArgumentResolver
implements HandlerMethodArgumentResolver, Ordered {
private static Log logger = LogFactory
.getLog(FluxHandlerMethodArgumentResolver.class);
private final JsonMapper mapper;
private FunctionInspector inspector;
public FluxHandlerMethodArgumentResolver(FunctionInspector inspector,
JsonMapper mapper) {
this.inspector = inspector;
this.mapper = mapper;
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
@Override
public Object resolveArgument(MethodParameter parameter,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest,
WebDataBinderFactory binderFactory) throws Exception {
Object handler = webRequest.getAttribute(WebRequestConstants.HANDLER,
NativeWebRequest.SCOPE_REQUEST);
Class<?> type = inspector.getInputType(handler);
if (type == null) {
type = Object.class;
}
boolean message = inspector.isMessage(handler);
List<?> body;
ContentCachingRequestWrapper nativeRequest = new ContentCachingRequestWrapper(
webRequest.getNativeRequest(HttpServletRequest.class));
if (logger.isDebugEnabled()) {
logger.debug("Resolving request body into type: " + type);
}
if (isPlainText(webRequest) && CharSequence.class.isAssignableFrom(type)) {
body = Arrays.asList(StreamUtils.copyToString(nativeRequest.getInputStream(),
Charset.forName("UTF-8")));
nativeRequest.setAttribute(WebRequestConstants.INPUT_SINGLE, true);
}
else {
String json = new String(StreamUtils.copyToString(
nativeRequest.getInputStream(), Charset.forName("UTF-8")));
if (!StringUtils.hasText(json)) {
body = null;
}
else {
if (json.startsWith("[")) {
body = mapper.toList(json, type);
}
else {
nativeRequest.setAttribute(WebRequestConstants.INPUT_SINGLE, true);
body = Arrays.asList(mapper.toSingle(json, type));
}
}
}
if (body != null && message) {
List<Object> messages = new ArrayList<>();
MessageHeaders headers = HeaderUtils.fromHttp(new ServletServerHttpRequest(
webRequest.getNativeRequest(HttpServletRequest.class)).getHeaders());
for (Object payload : body) {
messages.add(MessageUtils.create(handler, payload, headers));
}
body = messages;
}
return new FluxRequest<>(body);
}
private boolean isPlainText(NativeWebRequest webRequest) {
String value = webRequest.getHeader("Content-Type");
if (value != null) {
return MediaType.valueOf(value).isCompatibleWith(MediaType.TEXT_PLAIN);
}
return false;
}
@Override
public boolean supportsParameter(MethodParameter parameter) {
return FluxRequest.class.isAssignableFrom(parameter.getParameterType());
}
}

View File

@@ -1,45 +0,0 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux.request;
import java.util.List;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
*
*/
public class FluxRequest<T> {
private List<T> body;
public FluxRequest(List<T> body) {
this.body = body;
}
public Flux<T> flux() {
return Flux.fromIterable(body);
}
public List<T> body() {
return body;
}
}

View File

@@ -1,60 +0,0 @@
/*
* Copyright 2013-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux.response;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import reactor.core.publisher.Flux;
/**
* A specialized {@link ResponseBodyEmitter} that handles {@link Flux} return types.
*
* @author Dave Syer
*/
class FluxResponseBodyEmitter extends ResponseBodyEmitter {
private final MediaType mediaType;
private ResponseBodyEmitterSubscriber subscriber;
public FluxResponseBodyEmitter(Publisher<?> observable) {
this(new HttpHeaders(), null, observable);
}
public FluxResponseBodyEmitter(HttpHeaders request, MediaType mediaType,
Publisher<?> observable) {
super();
this.mediaType = mediaType;
this.subscriber = new ResponseBodyEmitterSubscriber(request, mediaType,
observable, this, MediaType.APPLICATION_JSON.isCompatibleWith(mediaType));
}
@Override
protected void extendResponse(ServerHttpResponse outputMessage) {
super.extendResponse(outputMessage);
this.subscriber.extendResponse(outputMessage);
HttpHeaders headers = outputMessage.getHeaders();
if (headers.getContentType() == null && this.mediaType != null
&& !MediaType.ALL.equals(this.mediaType)) {
headers.setContentType(this.mediaType);
}
}
}

View File

@@ -1,55 +0,0 @@
/*
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux.response;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.publisher.Flux;
/**
* A specialized {@link ResponseBodyEmitter} that handles {@link Flux} return types with
* SSE streams.
*
* @author Dave Syer
*/
class FluxResponseSseEmitter extends SseEmitter {
private ResponseBodyEmitterSubscriber subscriber;
public FluxResponseSseEmitter(Publisher<?> observable) {
this(new HttpHeaders(), MediaType.valueOf("text/plain"), observable);
}
public FluxResponseSseEmitter(HttpHeaders request, MediaType mediaType,
Publisher<?> observable) {
super();
this.subscriber = new ResponseBodyEmitterSubscriber(request, mediaType,
observable, this, false);
}
@Override
protected void extendResponse(ServerHttpResponse outputMessage) {
super.extendResponse(outputMessage);
this.subscriber.extendResponse(outputMessage);
}
}

View File

@@ -1,253 +0,0 @@
/*
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux.response;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.web.flux.constants.WebRequestConstants;
import org.springframework.cloud.function.web.util.HeaderUtils;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.messaging.Message;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler;
import org.springframework.web.method.support.ModelAndViewContainer;
import org.springframework.web.servlet.mvc.method.annotation.RequestResponseBodyMethodProcessor;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* A specialized {@link AsyncHandlerMethodReturnValueHandler} that handles {@link Flux}
* return types.
*
* @author Dave Syer
*/
public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHandler {
private static Log logger = LogFactory.getLog(FluxReturnValueHandler.class);
private ResponseBodyEmitterReturnValueHandler delegate;
private RequestResponseBodyMethodProcessor single;
private long timeout = 1000L;
private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream");
private FunctionInspector inspector;
private MethodParameter singleReturnType;
public FluxReturnValueHandler(FunctionInspector inspector,
List<HttpMessageConverter<?>> messageConverters) {
this.inspector = inspector;
this.delegate = new ResponseBodyEmitterReturnValueHandler(messageConverters);
this.single = new RequestResponseBodyMethodProcessor(messageConverters);
Method method = ReflectionUtils.findMethod(getClass(), "singleValue");
singleReturnType = new MethodParameter(method, -1);
}
ResponseEntity<Object> singleValue() {
return null;
}
/**
* Timeout for clients. If no items are seen on an HTTP response in this period then
* the response is closed.
*
* @param timeout the timeout to set
*/
public void setTimeout(long timeout) {
this.timeout = timeout;
}
@Override
public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) {
if (returnValue != null) {
return supportsReturnType(returnType);
}
return false;
}
@Override
public boolean supportsReturnType(MethodParameter returnType) {
return (returnType.getParameterType() != null
&& (Publisher.class.isAssignableFrom(returnType.getParameterType())
|| isResponseEntity(returnType)))
|| Publisher.class
.isAssignableFrom(returnType.getMethod().getReturnType());
}
private boolean isResponseEntity(MethodParameter returnType) {
if (ResponseEntity.class.isAssignableFrom(returnType.getParameterType())) {
Class<?> bodyType = ResolvableType.forMethodParameter(returnType)
.getGeneric(0).resolve();
return bodyType != null && Publisher.class.isAssignableFrom(bodyType);
}
return false;
}
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest)
throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
Object adaptFrom = returnValue;
if (returnValue instanceof ResponseEntity) {
ResponseEntity<?> value = (ResponseEntity<?>) returnValue;
adaptFrom = value.getBody();
HttpServletResponse response = webRequest
.getNativeResponse(HttpServletResponse.class);
response.setStatus(value.getStatusCodeValue());
HttpHeaders headers = value.getHeaders();
for (String name : headers.keySet()) {
List<String> list = headers.get(name);
for (String header : list) {
response.addHeader(name, header);
}
}
}
Publisher<?> flux = (Publisher<?>) adaptFrom;
Object handler = webRequest.getAttribute(WebRequestConstants.HANDLER,
NativeWebRequest.SCOPE_REQUEST);
Class<?> type = inspector.getOutputType(handler);
if (isOutputSingle(webRequest, handler, type)) {
Object result = Flux.from(flux).blockFirst();
if (result instanceof Message) {
Message<?> message = (Message<?>) result;
result = message.getPayload();
addHeaders(webRequest, message);
}
single.handleReturnValue(result, singleReturnType, mavContainer, webRequest);
return;
}
MediaType mediaType = null;
if (isPlainText(webRequest) && (CharSequence.class.isAssignableFrom(type)
|| Void.class.isAssignableFrom(type))) {
mediaType = MediaType.TEXT_PLAIN;
}
else {
mediaType = findMediaType(webRequest);
}
if (logger.isDebugEnabled()) {
logger.debug(
"Handling return value " + type + " with media type: " + mediaType);
}
ServletServerHttpRequest request = new ServletServerHttpRequest(
webRequest.getNativeRequest(HttpServletRequest.class));
delegate.handleReturnValue(
getEmitter(timeout, flux, mediaType, request.getHeaders()), returnType,
mavContainer, webRequest);
}
private void addHeaders(NativeWebRequest webRequest, Message<?> message) {
HttpServletResponse response = webRequest
.getNativeResponse(HttpServletResponse.class);
ServletServerHttpRequest request = new ServletServerHttpRequest(
webRequest.getNativeRequest(HttpServletRequest.class));
HttpHeaders headers = HeaderUtils.fromMessage(message.getHeaders(),
request.getHeaders());
for (String name : headers.keySet()) {
for (Object object : headers.get(name)) {
response.addHeader(name, object.toString());
}
}
}
private boolean isOutputSingle(NativeWebRequest webRequest, Object handler,
Class<?> type) {
Boolean single = (Boolean) webRequest.getAttribute(
WebRequestConstants.OUTPUT_SINGLE, NativeWebRequest.SCOPE_REQUEST);
if (single == null) {
// If the declared return type is a collection then we can render it as a
// "single" value
return Collection.class.isAssignableFrom(type);
}
return single;
}
private MediaType findMediaType(NativeWebRequest webRequest) {
List<MediaType> accepts = Arrays.asList(MediaType.ALL);
MediaType mediaType = null;
if (webRequest.getHeader("Accept") != null) {
accepts = MediaType.parseMediaTypes(webRequest.getHeader("Accept"));
for (MediaType accept : accepts) {
if (!MediaType.ALL.equals(accept)
&& MediaType.APPLICATION_JSON.isCompatibleWith(accept)) {
mediaType = MediaType.APPLICATION_JSON;
// Prefer JSON if that is acceptable
break;
}
else if (mediaType == null) {
mediaType = accept;
}
}
}
if (mediaType == null) {
mediaType = MediaType.APPLICATION_JSON;
}
return mediaType;
}
private boolean isPlainText(NativeWebRequest webRequest) {
String value = webRequest.getHeader("Content-Type");
if (value != null) {
return MediaType.valueOf(value).isCompatibleWith(MediaType.TEXT_PLAIN);
}
return false;
}
private ResponseBodyEmitter getEmitter(Long timeout, Publisher<?> flux,
MediaType mediaType, HttpHeaders request) {
Publisher<?> exported = flux instanceof Mono ? Mono.from(flux)
: Flux.from(flux).timeout(Duration.ofMillis(timeout), Flux.empty());
if (!MediaType.ALL.equals(mediaType)
&& EVENT_STREAM.isCompatibleWith(mediaType)) {
// TODO: more subtle content negotiation
return new FluxResponseSseEmitter(request, MediaType.APPLICATION_JSON,
exported);
}
return new FluxResponseBodyEmitter(request, mediaType, exported);
}
}

View File

@@ -1,212 +0,0 @@
/*
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux.response;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.cloud.function.web.util.HeaderUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.messaging.Message;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Subscriber that emits any value produced by the {@link Flux} into the delegated
* {@link ResponseBodyEmitter}.
*
* @author Dave Syer
*/
class ResponseBodyEmitterSubscriber implements Subscriber<Object> {
private final MediaType mediaType;
private Subscription subscription;
private final ResponseBodyEmitter responseBodyEmitter;
private boolean completed;
private boolean firstElementWritten;
private boolean single;
private final boolean json;
private Message<?> first;
private final HttpHeaders request;
public ResponseBodyEmitterSubscriber(HttpHeaders request, MediaType mediaType,
Publisher<?> observable, ResponseBodyEmitter responseBodyEmitter,
boolean json) {
this.request = request;
this.mediaType = mediaType;
this.responseBodyEmitter = responseBodyEmitter;
this.json = json;
this.responseBodyEmitter.onTimeout(new Timeout());
this.responseBodyEmitter.onCompletion(new Complete());
this.single = observable instanceof Mono;
observable.subscribe(this);
}
public void extendResponse(ServerHttpResponse response) {
headers(response);
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object value) {
Object object = value;
if (object instanceof Message) {
Message<?> message = (Message<?>) object;
object = message.getPayload();
this.first = message;
}
try {
if (isJson()) {
if (!this.firstElementWritten) {
if (!single) {
responseBodyEmitter.send("[");
this.firstElementWritten = true;
}
}
else {
responseBodyEmitter.send(",");
}
if (!single && object.getClass() == String.class
&& !((String) object).contains("\"")) {
object = "\"" + object + "\"";
}
}
if (!completed) {
responseBodyEmitter.send(object, mediaType);
}
}
catch (
IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
private void headers(ServerHttpResponse response) {
if (this.first != null) {
Message<?> message = first;
try {
HttpHeaders headers = HeaderUtils.fromMessage(message.getHeaders(),
request);
for (String name : headers.keySet()) {
for (String value : headers.get(name)) {
response.getHeaders().add(name, value);
}
}
}
catch (Exception e) {
// Headers could not be set
}
}
}
@Override
public void onError(Throwable e) {
if (!completed) {
completed = true;
try {
if (isJson()) {
if (!single) {
if (!this.firstElementWritten) {
responseBodyEmitter.send("[]");
}
else {
responseBodyEmitter.send("]");
}
}
}
if (e instanceof TimeoutException) {
responseBodyEmitter.complete();
}
else {
responseBodyEmitter.completeWithError(e);
}
}
catch (IOException ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
}
}
@Override
public void onComplete() {
if (!completed) {
completed = true;
try {
if (isJson()) {
if (!single) {
if (!this.firstElementWritten) {
responseBodyEmitter.send("[");
}
responseBodyEmitter.send("]");
}
}
}
catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
responseBodyEmitter.complete();
}
}
private boolean isJson() {
return json;
}
class Complete implements Runnable {
@Override
public void run() {
ResponseBodyEmitterSubscriber.this.subscription.cancel();
}
}
class Timeout implements Runnable {
@Override
public void run() {
onComplete();
ResponseBodyEmitterSubscriber.this.subscription.cancel();
}
}
}

View File

@@ -0,0 +1,116 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.mvc;
import java.util.Arrays;
import java.util.Iterator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.web.RequestProcessor;
import org.springframework.cloud.function.web.RequestProcessor.FunctionWrapper;
import org.springframework.cloud.function.web.constants.WebRequestConstants;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.context.request.WebRequest;
import reactor.core.publisher.Mono;
/**
* @author Dave Syer
* @author Mark Fisher
*/
@Component
public class FunctionController {
private RequestProcessor processor;
public FunctionController(RequestProcessor processor) {
this.processor = processor;
}
@PostMapping(path = "/**", consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE)
@ResponseBody
public Mono<ResponseEntity<?>> form(WebRequest request) {
FunctionWrapper wrapper = wrapper(request);
return processor.post(wrapper, null, false);
}
@PostMapping(path = "/**")
@ResponseBody
public Mono<ResponseEntity<?>> post(WebRequest request,
@RequestBody(required = false) String body) {
FunctionWrapper wrapper = wrapper(request);
return processor.post(wrapper, body, false);
}
@PostMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Mono<ResponseEntity<Publisher<?>>> postStream(WebRequest request,
@RequestBody(required = false) String body) {
FunctionWrapper wrapper = wrapper(request);
return processor.post(wrapper, body, true).map(response -> ResponseEntity.ok()
.headers(response.getHeaders()).body((Publisher<?>) response.getBody()));
}
@GetMapping(path = "/**")
@ResponseBody
public Mono<ResponseEntity<?>> get(WebRequest request) {
FunctionWrapper wrapper = wrapper(request);
return processor.get(wrapper);
}
@GetMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Mono<ResponseEntity<Publisher<?>>> getStream(WebRequest request) {
FunctionWrapper wrapper = wrapper(request);
return processor.stream(wrapper).map(response -> ResponseEntity.ok()
.headers(response.getHeaders()).body((Publisher<?>) response.getBody()));
}
private FunctionWrapper wrapper(WebRequest request) {
@SuppressWarnings("unchecked")
Function<Publisher<?>, Publisher<?>> function = (Function<Publisher<?>, Publisher<?>>) request
.getAttribute(WebRequestConstants.FUNCTION, WebRequest.SCOPE_REQUEST);
@SuppressWarnings("unchecked")
Consumer<Publisher<?>> consumer = (Consumer<Publisher<?>>) request
.getAttribute(WebRequestConstants.CONSUMER, WebRequest.SCOPE_REQUEST);
@SuppressWarnings("unchecked")
Supplier<Publisher<?>> supplier = (Supplier<Publisher<?>>) request
.getAttribute(WebRequestConstants.SUPPLIER, WebRequest.SCOPE_REQUEST);
FunctionWrapper wrapper = RequestProcessor.wrapper(function, consumer, supplier);
for (String key : request.getParameterMap().keySet()) {
wrapper.params().addAll(key, Arrays.asList(request.getParameterValues(key)));
}
for (Iterator<String> keys = request.getHeaderNames(); keys.hasNext();) {
String key = keys.next();
wrapper.headers().addAll(key, Arrays.asList(request.getHeaderValues(key)));
}
String argument = (String) request.getAttribute(WebRequestConstants.ARGUMENT,
WebRequest.SCOPE_REQUEST);
wrapper.argument(argument);
return wrapper;
}
}

View File

@@ -0,0 +1,163 @@
/*
* Copyright 2012-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.mvc;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.servlet.http.HttpServletRequest;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.web.constants.WebRequestConstants;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.HandlerMapping;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping;
/**
* @author Dave Syer
*
*/
@Configuration
@ConditionalOnClass(RequestMappingHandlerMapping.class)
public class FunctionHandlerMapping extends RequestMappingHandlerMapping
implements InitializingBean {
private final FunctionCatalog functions;
private final FunctionController controller;
@Value("${spring.cloud.function.web.path:}")
private String prefix = "";
@Autowired
public FunctionHandlerMapping(FunctionCatalog catalog,
FunctionController controller) {
this.functions = catalog;
logger.info("FunctionCatalog: " + catalog);
setOrder(super.getOrder() - 5);
this.controller = controller;
}
@Override
public void afterPropertiesSet() {
super.afterPropertiesSet();
detectHandlerMethods(controller);
while (prefix.endsWith("/")) {
prefix = prefix.substring(0, prefix.length() - 1);
}
}
@Override
protected void initHandlerMethods() {
}
@Override
protected HandlerMethod getHandlerInternal(HttpServletRequest request)
throws Exception {
HandlerMethod handler = super.getHandlerInternal(request);
if (handler == null) {
return null;
}
String path = (String) request
.getAttribute(HandlerMapping.PATH_WITHIN_HANDLER_MAPPING_ATTRIBUTE);
if (StringUtils.hasText(prefix) && !path.startsWith(prefix)) {
return null;
}
if (path.startsWith(prefix)) {
path = path.substring(prefix.length());
}
if (path == null) {
return handler;
}
Object function = findFunctionForGet(request, path);
if (function != null) {
if (logger.isDebugEnabled()) {
logger.debug("Found function for GET: " + path);
}
request.setAttribute(WebRequestConstants.HANDLER, function);
return handler;
}
function = findFunctionForPost(request, path);
if (function != null) {
if (logger.isDebugEnabled()) {
logger.debug("Found function for POST: " + path);
}
request.setAttribute(WebRequestConstants.HANDLER, function);
return handler;
}
return null;
}
private Object findFunctionForPost(HttpServletRequest request, String path) {
if (!request.getMethod().equals("POST")) {
return null;
}
path = path.startsWith("/") ? path.substring(1) : path;
Consumer<Publisher<?>> consumer = functions.lookup(Consumer.class, path);
if (consumer != null) {
request.setAttribute(WebRequestConstants.CONSUMER, consumer);
return consumer;
}
Function<Object, Object> function = functions.lookup(Function.class, path);
if (function != null) {
request.setAttribute(WebRequestConstants.FUNCTION, function);
return function;
}
return null;
}
private Object findFunctionForGet(HttpServletRequest request, String path) {
if (!request.getMethod().equals("GET")) {
return null;
}
path = path.startsWith("/") ? path.substring(1) : path;
Supplier<Publisher<?>> supplier = functions.lookup(Supplier.class, path);
if (supplier != null) {
request.setAttribute(WebRequestConstants.SUPPLIER, supplier);
return supplier;
}
StringBuilder builder = new StringBuilder();
String name = path;
String value = null;
for (String element : path.split("/")) {
if (builder.length() > 0) {
builder.append("/");
}
builder.append(element);
name = builder.toString();
value = path.length() > name.length() ? path.substring(name.length() + 1)
: null;
Function<Object, Object> function = functions.lookup(Function.class, name);
if (function != null) {
request.setAttribute(WebRequestConstants.FUNCTION, function);
request.setAttribute(WebRequestConstants.ARGUMENT, value);
return function;
}
}
return null;
}
}

View File

@@ -0,0 +1,92 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.mvc;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication.Type;
import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration;
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.web.RequestProcessor;
import org.springframework.cloud.function.web.StringConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
* @author Mark Fisher
* @author Oleg Zhurakousky
*/
@Configuration
@ConditionalOnWebApplication(type=Type.SERVLET)
@ConditionalOnClass({ Flux.class, AsyncHandlerMethodReturnValueHandler.class })
@Import({ FunctionController.class, RequestProcessor.class })
@AutoConfigureAfter({ JacksonAutoConfiguration.class, GsonAutoConfiguration.class })
public class ReactorAutoConfiguration {
@Bean
public FunctionHandlerMapping functionHandlerMapping(FunctionCatalog catalog,
FunctionController controller) {
return new FunctionHandlerMapping(catalog, controller);
}
@Bean
@ConditionalOnMissingBean
public StringConverter functionStringConverter(FunctionInspector inspector,
ConfigurableListableBeanFactory beanFactory) {
return new BasicStringConverter(inspector, beanFactory);
}
private static class BasicStringConverter implements StringConverter {
private ConversionService conversionService;
private ConfigurableListableBeanFactory registry;
private FunctionInspector inspector;
public BasicStringConverter(FunctionInspector inspector,
ConfigurableListableBeanFactory registry) {
this.inspector = inspector;
this.registry = registry;
}
@Override
public Object convert(Object function, String value) {
if (conversionService == null && registry != null) {
ConversionService conversionService = this.registry
.getConversionService();
this.conversionService = conversionService != null ? conversionService
: new DefaultConversionService();
}
Class<?> type = inspector.getInputType(function);
return conversionService.canConvert(String.class, type)
? conversionService.convert(value, type)
: value;
}
}
}

View File

@@ -1,5 +1,6 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.function.web.flux.ReactorAutoConfiguration
org.springframework.cloud.function.web.flux.ReactorAutoConfiguration,\
org.springframework.cloud.function.web.mvc.ReactorAutoConfiguration
org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc=\
org.springframework.cloud.function.web.flux.ReactorAutoConfiguration,\

View File

@@ -0,0 +1,416 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.flux;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.cloud.function.flux.FluxRestApplicationTests.TestConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import static org.assertj.core.api.Assertions.assertThat;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Tests for vanilla MVC handling (no function layer). Validates the MVC customizations
* that are added in this project independently of the specific concerns of function.
*
* @author Dave Syer
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = TestConfiguration.class, webEnvironment = WebEnvironment.RANDOM_PORT, properties = "spring.main.web-application-type=reactive")
public class FluxRestApplicationTests {
private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream");
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate rest;
@Autowired
private TestConfiguration test;
@Before
public void init() {
test.list.clear();
}
@Test
public void wordsSSE() throws Exception {
assertThat(rest.exchange(
RequestEntity.get(new URI("/words")).accept(EVENT_STREAM).build(),
String.class).getBody()).isEqualTo(sse("foo", "bar"));
}
@Test
public void wordsJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/words"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
@Ignore("Fix error handling")
public void errorJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/bang"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[\"foo\"]");
}
@Test
public void words() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/words")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
public void foos() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/foos")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]");
}
@Test
public void getMore() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/get/more")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
@Ignore("Should this even work? Or do we need to be explicit about the JSON?")
public void updates() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.post(new URI("/updates")).body("one\ntwo"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody()).isEqualTo("onetwo");
}
@Test
public void updatesJson() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/updates")).contentType(MediaType.APPLICATION_JSON)
.body("[\"one\",\"two\"]"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody()).isEqualTo("[\"one\",\"two\"]");
}
@Test
public void addFoos() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/addFoos")).contentType(MediaType.APPLICATION_JSON)
.body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]");
}
@Test
public void timeout() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/timeout")).build(), String.class)
.getBody()).isEqualTo("[\"foo\"]");
}
@Test
public void emptyJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/empty"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[]");
}
@Test
public void sentences() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/sentences")).build(), String.class)
.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
}
@Test
public void sentencesAcceptAny() throws Exception {
assertThat(rest.exchange(
RequestEntity.get(new URI("/sentences")).accept(MediaType.ALL).build(),
String.class).getBody())
.isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
}
@Test
public void sentencesAcceptJson() throws Exception {
ResponseEntity<String> result = rest
.exchange(
RequestEntity.get(new URI("/sentences"))
.accept(MediaType.APPLICATION_JSON).build(),
String.class);
assertThat(result.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
assertThat(result.getHeaders().getContentType())
.isGreaterThanOrEqualTo(MediaType.APPLICATION_JSON);
}
@Test
public void uppercase() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]");
}
@Test
public void uppercaseFoos() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/upFoos")).contentType(MediaType.APPLICATION_JSON)
.body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]");
}
@Test
public void transform() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/transform")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]");
}
@Test
public void postMore() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/post/more")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]");
}
@Test
public void uppercaseGet() throws Exception {
assertThat(rest.exchange(RequestEntity.get(new URI("/uppercase/foo"))
.accept(MediaType.TEXT_PLAIN).build(), String.class).getBody())
.isEqualTo("[FOO]");
}
@Test
public void convertGet() throws Exception {
assertThat(rest.exchange(RequestEntity.get(new URI("/wrap/123"))
.accept(MediaType.TEXT_PLAIN).build(), String.class).getBody())
.isEqualTo("..123..");
}
@Test
public void convertGetJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/entity/321"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("{\"value\":321}");
}
@Test
public void uppercaseJsonStream() throws Exception {
assertThat(
rest.exchange(
RequestEntity.post(new URI("/maps"))
.contentType(MediaType.APPLICATION_JSON)
.body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"),
String.class).getBody())
.isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]");
}
@Test
public void uppercaseSSE() throws Exception {
assertThat(rest.exchange(RequestEntity.post(new URI("/uppercase"))
.accept(EVENT_STREAM).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class).getBody())
.isEqualTo(sse("[FOO]", "[BAR]"));
}
@Test
public void altSSE() throws Exception {
assertThat(rest.exchange(RequestEntity.post(new URI("/alt")).accept(EVENT_STREAM)
.contentType(MediaType.APPLICATION_JSON).body("[\"foo\",\"bar\"]"),
String.class).getBody()).isEqualTo(sse("[FOO]", "[BAR]"));
}
private String sse(String... values) {
return "data:" + StringUtils.arrayToDelimitedString(values, "\n\ndata:") + "\n\n";
}
@EnableAutoConfiguration
@RestController
@Configuration
public static class TestConfiguration {
private List<String> list = new ArrayList<>();
@PostMapping({ "/uppercase", "/transform", "/post/more" })
public Flux<?> uppercase(@RequestBody List<String> flux) {
return Flux.fromIterable(flux).log()
.map(value -> "[" + value.trim().toUpperCase() + "]");
}
@PostMapping({ "/alt" })
public Mono<ResponseEntity<?>> alt(@RequestBody List<String> flux) {
Publisher<?> result = Flux.fromIterable(flux)
.map(value -> "[" + value.trim().toUpperCase() + "]");
return Flux.from(result).log()
.then(Mono.fromSupplier(() -> ResponseEntity.ok(result)));
}
@PostMapping("/upFoos")
public Flux<Foo> upFoos(@RequestBody List<Foo> list) {
return Flux.fromIterable(list).log()
.map(value -> new Foo(value.getValue().trim().toUpperCase()));
}
@GetMapping("/uppercase/{id}")
public Mono<ResponseEntity<?>> uppercaseGet(@PathVariable String id) {
return Mono.just(id).map(value -> "[" + value.trim().toUpperCase() + "]")
.flatMap(body -> Mono.just(ResponseEntity.ok(body)));
}
@GetMapping("/wrap/{id}")
public Mono<ResponseEntity<?>> wrapGet(@PathVariable int id) {
return Mono.just(id).log().map(value -> ".." + value + "..")
.flatMap(body -> Mono.just(ResponseEntity.ok(body)));
}
@GetMapping("/entity/{id}")
public Mono<Map<String, Object>> entity(@PathVariable Integer id) {
return Mono.just(id).log()
.map(value -> Collections.singletonMap("value", value));
}
@PostMapping("/maps")
public Flux<Map<String, String>> maps(
@RequestBody List<Map<String, String>> flux) {
return Flux.fromIterable(flux).map(value -> {
value.put("value", value.get("value").trim().toUpperCase());
return value;
});
}
@GetMapping({ "/words", "/get/more" })
public Flux<Object> words() {
return Flux.fromArray(new String[] { "foo", "bar" });
}
@GetMapping("/foos")
public Flux<Foo> foos() {
return Flux.just(new Foo("foo"), new Foo("bar"));
}
@PostMapping("/updates")
@ResponseStatus(HttpStatus.ACCEPTED)
public Flux<?> updates(@RequestBody List<String> list) {
Flux<String> flux = Flux.fromIterable(list).cache();
flux.subscribe(value -> this.list.add(value));
return flux;
}
@PostMapping("/addFoos")
@ResponseStatus(HttpStatus.ACCEPTED)
public Flux<Foo> addFoos(@RequestBody List<Foo> list) {
Flux<Foo> flux = Flux.fromIterable(list).cache();
flux.subscribe(value -> this.list.add(value.getValue()));
return flux;
}
@GetMapping("/bang")
public Flux<?> bang() {
return Flux.fromArray(new String[] { "foo", "bar" }).map(value -> {
if (value.equals("bar")) {
throw new RuntimeException("Bar");
}
return value;
});
}
@GetMapping("/empty")
public Flux<?> empty() {
return Flux.fromIterable(Collections.emptyList());
}
@GetMapping("/timeout")
public Flux<?> timeout() {
return Flux.defer(() -> Flux.<String>create(emitter -> {
emitter.next("foo");
}).timeout(Duration.ofMillis(100L), Flux.empty()));
}
@GetMapping("/sentences")
public Flux<List<String>> sentences() {
return Flux.just(Arrays.asList("go", "home"), Arrays.asList("come", "back"));
}
}
public static class Foo {
private String value;
public Foo(String value) {
this.value = value;
}
Foo() {
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
}

View File

@@ -61,8 +61,8 @@ import reactor.core.publisher.Mono;
* @author Dave Syer
*
*/
@SpringBootTest(classes = TestConfiguration.class, webEnvironment = WebEnvironment.RANDOM_PORT, properties = "spring.main.web-application-type=servlet")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = TestConfiguration.class, webEnvironment = WebEnvironment.RANDOM_PORT)
public class MvcRestApplicationTests {
private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream");
@@ -253,11 +253,13 @@ public class MvcRestApplicationTests {
@Test
public void uppercaseJsonStream() throws Exception {
assertThat(rest
.exchange(RequestEntity.post(new URI("/maps"))
.contentType(MediaType.APPLICATION_JSON)
.body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class)
.getBody()).isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]");
assertThat(
rest.exchange(
RequestEntity.post(new URI("/maps"))
.contentType(MediaType.APPLICATION_JSON)
.body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"),
String.class).getBody())
.isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]");
}
@Test

View File

@@ -13,34 +13,41 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
package org.springframework.cloud.function.web.flux;
import java.net.URI;
import java.util.function.Function;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.cloud.function.web.RestApplication;
import org.springframework.cloud.function.web.flux.HeadersToMessageTests.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
*
* @author Oleg Zhurakousky
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = {"spring.cloud.function.web.path=/functions" })
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = {
"spring.cloud.function.web.path=/functions",
"spring.main.web-application-type=reactive" })
@ContextConfiguration(classes= {RestApplication.class, TestConfiguration.class})
public class HeadersToMessageTests {
@Autowired
@@ -48,26 +55,27 @@ public class HeadersToMessageTests {
@Test
public void testBodyAndCustomHeaderFromMessagePropagation() throws Exception {
ResponseEntity<String> postForEntity = rest.postForEntity(new URI("/functions/employee"), "{\"name\":\"Bob\",\"age\":25}", String.class);
ResponseEntity<String> postForEntity = rest.postForEntity(
new URI("/functions/employee"), "{\"name\":\"Bob\",\"age\":25}",
String.class);
assertEquals("{\"name\":\"Bob\",\"age\":25}", postForEntity.getBody());
assertTrue(postForEntity.getHeaders().containsKey("x-content-type"));
assertEquals("application/xml", postForEntity.getHeaders().get("x-content-type").get(0));
assertEquals("application/xml",
postForEntity.getHeaders().get("x-content-type").get(0));
assertEquals("bar", postForEntity.getHeaders().get("foo").get(0));
}
@EnableAutoConfiguration
@org.springframework.boot.test.context.TestConfiguration
protected static class TestConfiguration {
@Bean({ "employee"})
public Function<Message<String>, Message<String>> function() {
@Bean({ "employee" })
public Function<Message<String>, Message<String>> function() {
return request -> {
Message<String> message = MessageBuilder.withPayload(request.getPayload())
.setHeader("X-Content-Type", "application/xml")
.setHeader("foo", "bar")
.build();
.setHeader("foo", "bar").build();
return message;
};
}
}
}
}

View File

@@ -0,0 +1,360 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.cloud.function.web.RestApplication;
import org.springframework.cloud.function.web.flux.HttpGetIntegrationTests.ApplicationConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import static org.assertj.core.api.Assertions.assertThat;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = "spring.main.web-application-type=reactive")
@ContextConfiguration(classes= {RestApplication.class, ApplicationConfiguration.class})
public class HttpGetIntegrationTests {
private static final MediaType EVENT_STREAM = MediaType.TEXT_EVENT_STREAM;
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate rest;
@Autowired
private ApplicationConfiguration test;
@Before
public void init() {
test.list.clear();
}
@Test
public void staticResource() {
assertThat(rest.getForObject("/test.html", String.class)).contains("<body>Test");
}
@Test
public void wordsSSE() throws Exception {
assertThat(rest.exchange(
RequestEntity.get(new URI("/words")).accept(EVENT_STREAM).build(),
String.class).getBody()).isEqualTo(sse("foo", "bar"));
}
@Test
public void wordsJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/words"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
@Ignore("Fix error handling")
public void errorJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/bang"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[\"foo\"]");
}
@Test
public void words() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/words")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
public void word() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.get(new URI("/word")).accept(MediaType.TEXT_PLAIN).build(),
String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("foo");
}
@Test
public void foos() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/foos")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]");
}
@Test
public void getMore() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/get/more")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
public void bareWords() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/bareWords")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
public void timeoutJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/timeout"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[\"foo\"]");
}
@Test
public void emptyJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/empty"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[]");
}
@Test
public void sentences() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/sentences")).build(), String.class)
.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
}
@Test
public void sentencesAcceptAny() throws Exception {
assertThat(rest.exchange(
RequestEntity.get(new URI("/sentences")).accept(MediaType.ALL).build(),
String.class).getBody())
.isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
}
@Test
public void sentencesAcceptJson() throws Exception {
ResponseEntity<String> result = rest
.exchange(
RequestEntity.get(new URI("/sentences"))
.accept(MediaType.APPLICATION_JSON).build(),
String.class);
assertThat(result.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
assertThat(result.getHeaders().getContentType())
.isGreaterThanOrEqualTo(MediaType.APPLICATION_JSON);
}
@Test
public void sentencesAcceptSse() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.get(new URI("/sentences")).accept(EVENT_STREAM).build(),
String.class);
assertThat(result.getBody())
.isEqualTo(sse("[\"go\",\"home\"]", "[\"come\",\"back\"]"));
assertThat(result.getHeaders().getContentType().isCompatibleWith(EVENT_STREAM))
.isTrue();
}
@Test
public void postMoreFoo() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.get(new URI("/post/more/foo")).accept(MediaType.TEXT_PLAIN).build(),
String.class);
assertThat(result.getBody()).isEqualTo("(FOO)");
}
@Test
public void uppercaseGet() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.get(new URI("/uppercase/foo")).accept(MediaType.TEXT_PLAIN).build(),
String.class);
assertThat(result.getBody()).isEqualTo("(FOO)");
}
@Test
public void convertGet() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.get(new URI("/wrap/123")).accept(MediaType.TEXT_PLAIN).build(),
String.class);
assertThat(result.getBody()).isEqualTo("..123..");
}
@Test
public void supplierFirst() {
assertThat(rest.getForObject("/not/a/function", String.class))
.isEqualTo("[\"hello\"]");
}
@Test
public void convertGetJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/entity/321"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("{\"value\":321}");
}
private String sse(String... values) {
return "data:" + StringUtils.arrayToDelimitedString(values, "\n\ndata:") + "\n\n";
}
@EnableAutoConfiguration
@TestConfiguration
public static class ApplicationConfiguration {
private List<String> list = new ArrayList<>();
public static void main(String[] args) throws Exception {
SpringApplication.run(HttpGetIntegrationTests.ApplicationConfiguration.class,
args);
}
@Bean({ "uppercase", "post/more" })
public Function<Flux<String>, Flux<String>> uppercase() {
return flux -> flux.log()
.map(value -> "(" + value.trim().toUpperCase() + ")");
}
@Bean
public Function<Flux<Integer>, Flux<String>> wrap() {
return flux -> flux.log().map(value -> ".." + value + "..");
}
@Bean
public Function<Flux<Integer>, Flux<Map<String, Object>>> entity() {
return flux -> flux.log()
.map(value -> Collections.singletonMap("value", value));
}
@Bean({ "words", "get/more" })
public Supplier<Flux<String>> words() {
return () -> Flux.just("foo", "bar");
}
@Bean
public Supplier<String> word() {
return () -> "foo";
}
@Bean
public Supplier<Flux<Foo>> foos() {
return () -> Flux.just(new Foo("foo"), new Foo("bar"));
}
@Bean
public Supplier<List<String>> bareWords() {
return () -> Arrays.asList("foo", "bar");
}
@Bean
public Supplier<Flux<String>> bang() {
return () -> Flux.fromArray(new String[] { "foo", "bar" }).map(value -> {
if (value.equals("bar")) {
throw new RuntimeException("Bar");
}
return value;
});
}
@Bean
public Supplier<Flux<String>> empty() {
return () -> Flux.fromIterable(Collections.emptyList());
}
@Bean("not/a/function")
public Supplier<Flux<String>> supplier() {
return () -> Flux.just("hello");
}
@Bean("not/a")
public Function<Flux<String>, Flux<String>> function() {
return input -> Flux.just("bye");
}
@Bean
public Supplier<Flux<String>> timeout() {
return () -> Flux.defer(() -> Flux.<String>create(emitter -> {
emitter.next("foo");
}).timeout(Duration.ofMillis(100L), Flux.empty()));
}
@Bean
public Supplier<Flux<List<String>>> sentences() {
return () -> Flux.just(Arrays.asList("go", "home"),
Arrays.asList("come", "back"));
}
@Bean
public Function<MultiValueMap<String, String>, Map<String, Integer>> sum() {
return valueMap -> valueMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, values -> values
.getValue().stream().mapToInt(Integer::parseInt).sum()));
}
}
public static class Foo {
private String value;
public Foo(String value) {
this.value = value;
}
Foo() {
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
}

View File

@@ -0,0 +1,441 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.cloud.function.web.RestApplication;
import org.springframework.cloud.function.web.flux.HttpPostIntegrationTests.ApplicationConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import static org.assertj.core.api.Assertions.assertThat;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* @author Dave Syer
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = "spring.main.web-application-type=reactive")
@ContextConfiguration(classes = { RestApplication.class, ApplicationConfiguration.class })
public class HttpPostIntegrationTests {
private static final MediaType EVENT_STREAM = MediaType.TEXT_EVENT_STREAM;
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate rest;
@Autowired
private ApplicationConfiguration test;
@Before
public void init() {
test.list.clear();
}
@Test
public void qualifierFoos() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity.post(new URI("/foos"))
.contentType(MediaType.APPLICATION_JSON).body("[\"foo\",\"bar\"]"),
String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"[FOO]\"},{\"value\":\"[BAR]\"}]");
}
@Test
public void updates() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.post(new URI("/updates")).body("[\"one\", \"two\"]"),
String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody()).isNull();
}
@Test
public void updatesJson() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/updates")).contentType(MediaType.APPLICATION_JSON)
.body("[\"one\",\"two\"]"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody()).isEqualTo(null);
}
@Test
public void addFoos() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/addFoos")).contentType(MediaType.APPLICATION_JSON)
.body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody()).isEqualTo(null);
}
@Test
public void bareUpdates() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/bareUpdates")).contentType(MediaType.APPLICATION_JSON)
.body("[\"one\",\"two\"]"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(test.list).hasSize(2);
assertThat(result.getBody()).isEqualTo("[]");
}
@Test
public void uppercase() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]");
}
@Test
public void messages() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/messages")).contentType(MediaType.APPLICATION_JSON)
.header("x-foo", "bar").body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getHeaders().getFirst("x-foo")).isEqualTo("bar");
assertThat(result.getHeaders()).doesNotContainKey("id");
assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]");
}
@Test
public void headers() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/headers")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getHeaders().getFirst("foo")).isEqualTo("bar");
assertThat(result.getHeaders()).doesNotContainKey("id");
assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]");
}
@Test
public void uppercaseSingleValue() throws Exception {
ResponseEntity<String> result = rest
.exchange(
RequestEntity.post(new URI("/uppercase"))
.contentType(MediaType.TEXT_PLAIN).body("foo"),
String.class);
assertThat(result.getBody()).isEqualTo("[\"(FOO)\"]");
}
@Test
@Ignore("WebFlux would split the request body into lines: TODO make this work the same")
public void uppercasePlainText() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.post(new URI("/uppercase"))
.contentType(MediaType.TEXT_PLAIN).body("foo\nbar"),
String.class);
assertThat(result.getBody()).isEqualTo("(FOO\nBAR)");
}
@Test
public void uppercaseFoos() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/upFoos")).contentType(MediaType.APPLICATION_JSON)
.body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]");
}
@Test
public void uppercaseFoo() throws Exception {
// Single Foo can be parsed
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/upFoos")).contentType(MediaType.APPLICATION_JSON)
.body("{\"value\":\"foo\"}"), String.class);
assertThat(result.getBody()).isEqualTo("[{\"value\":\"FOO\"}]");
}
@Test
public void bareUppercaseFoos() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/bareUpFoos")).contentType(MediaType.APPLICATION_JSON)
.body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]");
}
@Test
public void bareUppercaseFoo() throws Exception {
// Single Foo can be parsed and returns a single value if the function is defined
// that way
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/bareUpFoos")).contentType(MediaType.APPLICATION_JSON)
.body("{\"value\":\"foo\"}"), String.class);
assertThat(result.getBody()).isEqualTo("{\"value\":\"FOO\"}");
}
@Test
public void bareUppercase() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/bareUppercase")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]");
}
@Test
public void singleValuedText() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.post(new URI("/bareUppercase")).accept(MediaType.TEXT_PLAIN)
.contentType(MediaType.TEXT_PLAIN).body("foo"),
String.class);
assertThat(result.getBody()).isEqualTo("(FOO)");
}
@Test
public void transform() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/transform")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]");
}
@Test
public void postMore() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/post/more")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]");
}
@Test
public void convertPost() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity.post(new URI("/wrap"))
.contentType(MediaType.TEXT_PLAIN).body("123"), String.class);
// Result is multi-valued so it has to come out as an array
assertThat(result.getBody()).isEqualTo("[\"..123..\"]");
}
@Test
public void convertPostJson() throws Exception {
// If you POST a single value to a Function<Flux<Integer>,Flux<Integer>> it can't
// determine if the output is single valued, so it has to send an array back
ResponseEntity<String> result = rest
.exchange(
RequestEntity.post(new URI("/doubler"))
.contentType(MediaType.TEXT_PLAIN).body("123"),
String.class);
assertThat(result.getBody()).isEqualTo("[246]");
}
@Test
public void uppercaseJsonArray() throws Exception {
assertThat(rest.exchange(
RequestEntity.post(new URI("/maps"))
.contentType(MediaType.APPLICATION_JSON)
// The new line in the middle is optional
.body("[{\"value\":\"foo\"},\n{\"value\":\"bar\"}]"),
String.class).getBody())
.isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]");
}
@Test
public void uppercaseSSE() throws Exception {
assertThat(rest.exchange(RequestEntity.post(new URI("/uppercase"))
.accept(EVENT_STREAM).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class).getBody())
.isEqualTo(sse("(FOO)", "(BAR)"));
}
@Test
public void sum() throws Exception {
LinkedMultiValueMap<String, String> map = new LinkedMultiValueMap<>();
map.put("A", Arrays.asList("1", "2", "3"));
map.put("B", Arrays.asList("5", "6"));
assertThat(rest.exchange(
RequestEntity.post(new URI("/sum")).accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_FORM_URLENCODED).body(map),
String.class).getBody()).isEqualTo("[{\"A\":6,\"B\":11}]");
}
@Test
public void count() throws Exception {
List<String> list = Arrays.asList("A", "B", "A");
assertThat(rest.exchange(
RequestEntity.post(new URI("/count")).accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON).body(list),
String.class).getBody()).isEqualTo("{\"A\":2,\"B\":1}");
}
private String sse(String... values) {
return "data:" + StringUtils.arrayToDelimitedString(values, "\n\ndata:") + "\n\n";
}
@EnableAutoConfiguration
@TestConfiguration
public static class ApplicationConfiguration {
private List<String> list = new ArrayList<>();
public static void main(String[] args) throws Exception {
SpringApplication.run(HttpPostIntegrationTests.ApplicationConfiguration.class,
args);
}
@Bean({ "uppercase", "transform", "post/more" })
public Function<Flux<String>, Flux<String>> uppercase() {
return flux -> flux.log()
.map(value -> "(" + value.trim().toUpperCase() + ")");
}
@Bean
public Function<String, String> bareUppercase() {
return value -> "(" + value.trim().toUpperCase() + ")";
}
@Bean
public Function<Message<String>, Message<String>> messages() {
return value -> MessageBuilder
.withPayload("(" + value.getPayload().trim().toUpperCase() + ")")
.copyHeaders(value.getHeaders()).build();
}
@Bean
public Function<Flux<Message<String>>, Flux<Message<String>>> headers() {
return flux -> flux.map(value -> MessageBuilder
.withPayload("(" + value.getPayload().trim().toUpperCase() + ")")
.setHeader("foo", "bar").build());
}
@Bean
public Function<Flux<Foo>, Flux<Foo>> upFoos() {
return flux -> flux.log()
.map(value -> new Foo(value.getValue().trim().toUpperCase()));
}
@Bean
public Function<Foo, Foo> bareUpFoos() {
return value -> new Foo(value.getValue().trim().toUpperCase());
}
@Bean
public Function<Flux<Integer>, Flux<String>> wrap() {
return flux -> flux.log().map(value -> ".." + value + "..");
}
@Bean
public Function<Flux<Integer>, Flux<Integer>> doubler() {
return flux -> flux.log().map(value -> 2 * value);
}
@Bean
public Function<Flux<HashMap<String, String>>, Flux<Map<String, String>>> maps() {
return flux -> flux.map(value -> {
value.put("value", value.get("value").trim().toUpperCase());
return value;
});
}
@Bean
@Qualifier("foos")
public Function<String, Foo> qualifier() {
return value -> new Foo("[" + value.trim().toUpperCase() + "]");
}
@Bean
public Consumer<Flux<String>> updates() {
return flux -> flux.subscribe(value -> list.add(value));
}
@Bean
public Consumer<Flux<Foo>> addFoos() {
return flux -> flux.subscribe(value -> list.add(value.getValue()));
}
@Bean
public Consumer<String> bareUpdates() {
return value -> list.add(value);
}
@Bean("not/a")
public Function<Flux<String>, Flux<String>> function() {
return input -> Flux.just("bye");
}
@Bean
public Function<MultiValueMap<String, String>, Map<String, Integer>> sum() {
return valueMap -> valueMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, values -> values
.getValue().stream().mapToInt(Integer::parseInt).sum()));
}
@Bean
public Function<Flux<String>, Mono<Map<String, Integer>>> count() {
return flux -> flux.collect(HashMap::new,
(map, word) -> map.merge(word, 1, Integer::sum));
}
}
public static class Foo {
private String value;
public Foo(String value) {
this.value = value;
}
Foo() {
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.function.web;
package org.springframework.cloud.function.web.flux;
import java.net.URI;
import java.util.function.Supplier;
@@ -28,10 +28,13 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.cloud.function.web.RestApplication;
import org.springframework.cloud.function.web.flux.PrefixTests.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
@@ -44,7 +47,9 @@ import reactor.core.publisher.Flux;
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = {
"spring.main.web-application-type=reactive",
"spring.cloud.function.web.path=/functions", "debug" })
@ContextConfiguration(classes= {RestApplication.class, TestConfiguration.class})
public class PrefixTests {
@LocalServerPort

View File

@@ -0,0 +1,102 @@
/*
* Copyright 2012-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux;
import java.net.URI;
import java.util.function.Supplier;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.cloud.function.web.RestApplication;
import org.springframework.cloud.function.web.flux.SingletonTests.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = "spring.main.web-application-type=reactive")
@ContextConfiguration(classes= {RestApplication.class, TestConfiguration.class})
public class SingletonTests {
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate rest;
@Test
public void words() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/words")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@EnableAutoConfiguration
@org.springframework.boot.test.context.TestConfiguration
protected static class TestConfiguration {
@Bean
public static BeanDefinitionRegistryPostProcessor processor() {
return new BeanDefinitionRegistryPostProcessor() {
@Override
public void postProcessBeanFactory(
ConfigurableListableBeanFactory beanFactory)
throws BeansException {
}
@Override
public void postProcessBeanDefinitionRegistry(
BeanDefinitionRegistry registry) throws BeansException {
// Simulates what happens when you add a compiled function
RootBeanDefinition beanDefinition = new RootBeanDefinition(
MySupplier.class);
registry.registerBeanDefinition("words", beanDefinition);
}
};
}
}
static class MySupplier implements Supplier<Flux<String>> {
@Override
public Flux<String> get() {
return Flux.just("foo", "bar");
}
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.function.web;
package org.springframework.cloud.function.web.mvc;
import java.net.URI;
import java.util.function.Function;
@@ -29,10 +29,13 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.cloud.function.web.RestApplication;
import org.springframework.cloud.function.web.mvc.DefaultRouteTests.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
@@ -45,6 +48,7 @@ import reactor.core.publisher.Flux;
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = "")
@ContextConfiguration(classes= {RestApplication.class, TestConfiguration.class})
public class DefaultRouteTests {
@LocalServerPort

View File

@@ -0,0 +1,82 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.mvc;
import java.net.URI;
import java.util.Map;
import java.util.function.Function;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.cloud.function.web.RestApplication;
import org.springframework.cloud.function.web.mvc.HeadersToMessageTests.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
*
* @author Oleg Zhurakousky
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = {
"spring.main.web-application-type=servlet", "spring.cloud.function.web.path=/functions" })
@ContextConfiguration(classes= {RestApplication.class, TestConfiguration.class})
public class HeadersToMessageTests {
@Autowired
private TestRestTemplate rest;
@Test
public void testBodyAndCustomHeaderFromMessagePropagation() throws Exception {
ResponseEntity<String> postForEntity = rest.postForEntity(
new URI("/functions/employee"), "{\"name\":\"Bob\",\"age\":25}",
String.class);
assertEquals("{\"name\":\"Bob\",\"age\":25}", postForEntity.getBody());
assertTrue(postForEntity.getHeaders().containsKey("x-content-type"));
assertEquals("application/xml",
postForEntity.getHeaders().get("x-content-type").get(0));
assertEquals("bar", postForEntity.getHeaders().get("foo").get(0));
}
@EnableAutoConfiguration
@org.springframework.boot.test.context.TestConfiguration
protected static class TestConfiguration {
@Bean({ "employee" })
public Function<Message<Map<String, Object>>, Message<Map<String, Object>>> function() {
return request -> {
Message<Map<String, Object>> message = MessageBuilder
.withPayload(request.getPayload())
.setHeader("X-Content-Type", "application/xml")
.setHeader("foo", "bar").build();
return message;
};
}
}
}

View File

@@ -0,0 +1,350 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.mvc;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.cloud.function.web.RestApplication;
import org.springframework.cloud.function.web.mvc.HttpGetIntegrationTests.ApplicationConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import static org.assertj.core.api.Assertions.assertThat;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties="spring.main.web-application-type=servlet")
@ContextConfiguration(classes= {RestApplication.class, ApplicationConfiguration.class})
public class HttpGetIntegrationTests {
private static final MediaType EVENT_STREAM = MediaType.TEXT_EVENT_STREAM;
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate rest;
@Autowired
private ApplicationConfiguration test;
@Before
public void init() {
test.list.clear();
}
@Test
public void staticResource() {
assertThat(rest.getForObject("/test.html", String.class)).contains("<body>Test");
}
@Test
public void wordsSSE() throws Exception {
assertThat(rest.exchange(
RequestEntity.get(new URI("/words")).accept(EVENT_STREAM).build(),
String.class).getBody()).isEqualTo(sse("foo", "bar"));
}
@Test
public void wordsJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/words"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
@Ignore("Fix error handling")
public void errorJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/bang"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[\"foo\"]");
}
@Test
public void words() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/words")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
public void word() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/word")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("foo");
}
@Test
public void foos() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/foos")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]");
}
@Test
public void getMore() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/get/more")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
public void bareWords() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/bareWords")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
public void timeoutJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/timeout"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[\"foo\"]");
}
@Test
public void emptyJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/empty"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[]");
}
@Test
public void sentences() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/sentences")).build(), String.class)
.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
}
@Test
public void sentencesAcceptAny() throws Exception {
assertThat(rest.exchange(
RequestEntity.get(new URI("/sentences")).accept(MediaType.ALL).build(),
String.class).getBody())
.isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
}
@Test
public void sentencesAcceptJson() throws Exception {
ResponseEntity<String> result = rest
.exchange(
RequestEntity.get(new URI("/sentences"))
.accept(MediaType.APPLICATION_JSON).build(),
String.class);
assertThat(result.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
assertThat(result.getHeaders().getContentType())
.isGreaterThanOrEqualTo(MediaType.APPLICATION_JSON);
}
@Test
public void sentencesAcceptSse() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.get(new URI("/sentences")).accept(EVENT_STREAM).build(),
String.class);
assertThat(result.getBody())
.isEqualTo(sse("[\"go\",\"home\"]", "[\"come\",\"back\"]"));
assertThat(result.getHeaders().getContentType().isCompatibleWith(EVENT_STREAM))
.isTrue();
}
@Test
public void postMoreFoo() {
assertThat(rest.getForObject("/post/more/foo", String.class)).isEqualTo("(FOO)");
}
@Test
public void uppercaseGet() {
assertThat(rest.getForObject("/uppercase/foo", String.class)).isEqualTo("(FOO)");
}
@Test
public void convertGet() {
assertThat(rest.getForObject("/wrap/123", String.class)).isEqualTo("..123..");
}
@Test
public void supplierFirst() {
assertThat(rest.getForObject("/not/a/function", String.class))
.isEqualTo("[\"hello\"]");
}
@Test
public void convertGetJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/entity/321"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("{\"value\":321}");
}
private String sse(String... values) {
return "data:" + StringUtils.arrayToDelimitedString(values, "\n\ndata:") + "\n\n";
}
@EnableAutoConfiguration
@TestConfiguration
public static class ApplicationConfiguration {
private List<String> list = new ArrayList<>();
public static void main(String[] args) throws Exception {
SpringApplication.run(HttpGetIntegrationTests.ApplicationConfiguration.class,
args);
}
@Bean({ "uppercase", "post/more" })
public Function<Flux<String>, Flux<String>> uppercase() {
return flux -> flux.log()
.map(value -> "(" + value.trim().toUpperCase() + ")");
}
@Bean
public Function<Flux<Integer>, Flux<String>> wrap() {
return flux -> flux.log().map(value -> ".." + value + "..");
}
@Bean
public Function<Flux<Integer>, Flux<Map<String, Object>>> entity() {
return flux -> flux.log()
.map(value -> Collections.singletonMap("value", value));
}
@Bean({ "words", "get/more" })
public Supplier<Flux<String>> words() {
return () -> Flux.just("foo", "bar");
}
@Bean
public Supplier<String> word() {
return () -> "foo";
}
@Bean
public Supplier<Flux<Foo>> foos() {
return () -> Flux.just(new Foo("foo"), new Foo("bar"));
}
@Bean
public Supplier<List<String>> bareWords() {
return () -> Arrays.asList("foo", "bar");
}
@Bean
public Supplier<Flux<String>> bang() {
return () -> Flux.fromArray(new String[] { "foo", "bar" }).map(value -> {
if (value.equals("bar")) {
throw new RuntimeException("Bar");
}
return value;
});
}
@Bean
public Supplier<Flux<String>> empty() {
return () -> Flux.fromIterable(Collections.emptyList());
}
@Bean("not/a/function")
public Supplier<Flux<String>> supplier() {
return () -> Flux.just("hello");
}
@Bean("not/a")
public Function<Flux<String>, Flux<String>> function() {
return input -> Flux.just("bye");
}
@Bean
public Supplier<Flux<String>> timeout() {
return () -> Flux.defer(() -> Flux.<String>create(emitter -> {
emitter.next("foo");
}).timeout(Duration.ofMillis(100L), Flux.empty()));
}
@Bean
public Supplier<Flux<List<String>>> sentences() {
return () -> Flux.just(Arrays.asList("go", "home"),
Arrays.asList("come", "back"));
}
@Bean
public Function<MultiValueMap<String, String>, Map<String, Integer>> sum() {
return valueMap -> valueMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, values -> values
.getValue().stream().mapToInt(Integer::parseInt).sum()));
}
}
public static class Foo {
private String value;
public Foo(String value) {
this.value = value;
}
Foo() {
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
}

View File

@@ -13,19 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web;
package org.springframework.cloud.function.web.mvc;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.junit.Before;
@@ -42,6 +39,8 @@ import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.cloud.function.web.RestApplication;
import org.springframework.cloud.function.web.mvc.HttpPostIntegrationTests.ApplicationConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
@@ -49,6 +48,7 @@ import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
@@ -63,8 +63,9 @@ import reactor.core.publisher.Mono;
* @author Dave Syer
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
public class RestApplicationTests {
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties="spring.main.web-application-type=servlet")
@ContextConfiguration(classes= {RestApplication.class, ApplicationConfiguration.class})
public class HttpPostIntegrationTests {
private static final MediaType EVENT_STREAM = MediaType.TEXT_EVENT_STREAM;
@LocalServerPort
@@ -78,61 +79,6 @@ public class RestApplicationTests {
public void init() {
test.list.clear();
}
@Test
public void staticResource() {
assertThat(rest.getForObject("/test.html", String.class)).contains("<body>Test");
}
@Test
public void wordsSSE() throws Exception {
assertThat(rest.exchange(
RequestEntity.get(new URI("/words")).accept(EVENT_STREAM).build(),
String.class).getBody()).isEqualTo(sse("foo", "bar"));
}
@Test
public void wordsJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/words"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
@Ignore("Fix error handling")
public void errorJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/bang"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[\"foo\"]");
}
@Test
public void words() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/words")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
public void word() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/word")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("foo");
}
@Test
public void foos() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/foos")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]");
}
@Test
public void qualifierFoos() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity.post(new URI("/foos"))
@@ -144,29 +90,13 @@ public class RestApplicationTests {
}
@Test
public void getMore() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/get/more")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
public void bareWords() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/bareWords")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
@Ignore("Should this even work? Or do we need to be explicit about the JSON?")
public void updates() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.post(new URI("/updates")).body("one\ntwo"), String.class);
RequestEntity.post(new URI("/updates")).body("[\"one\", \"two\"]"),
String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody()).isEqualTo("onetwo");
assertThat(result.getBody()).isNull();
}
@Test
@@ -176,7 +106,7 @@ public class RestApplicationTests {
.body("[\"one\",\"two\"]"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody()).isEqualTo("[\"one\",\"two\"]");
assertThat(result.getBody()).isEqualTo(null);
}
@Test
@@ -187,7 +117,7 @@ public class RestApplicationTests {
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]");
.isEqualTo(null);
}
@Test
@@ -200,60 +130,6 @@ public class RestApplicationTests {
assertThat(result.getBody()).isEqualTo("[]");
}
@Test
public void timeoutJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/timeout"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[\"foo\"]");
}
@Test
public void emptyJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/empty"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[]");
}
@Test
public void sentences() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/sentences")).build(), String.class)
.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
}
@Test
public void sentencesAcceptAny() throws Exception {
assertThat(rest.exchange(
RequestEntity.get(new URI("/sentences")).accept(MediaType.ALL).build(),
String.class).getBody())
.isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
}
@Test
public void sentencesAcceptJson() throws Exception {
ResponseEntity<String> result = rest
.exchange(
RequestEntity.get(new URI("/sentences"))
.accept(MediaType.APPLICATION_JSON).build(),
String.class);
assertThat(result.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
assertThat(result.getHeaders().getContentType())
.isGreaterThanOrEqualTo(MediaType.APPLICATION_JSON);
}
@Test
public void sentencesAcceptSse() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.get(new URI("/sentences")).accept(EVENT_STREAM).build(),
String.class);
assertThat(result.getBody())
.isEqualTo(sse("[\"go\",\"home\"]", "[\"come\",\"back\"]"));
assertThat(result.getHeaders().getContentType().isCompatibleWith(EVENT_STREAM))
.isTrue();
}
@Test
public void uppercase() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
@@ -263,10 +139,11 @@ public class RestApplicationTests {
}
@Test
// @Ignore("FIXME")
public void messages() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/messages")).contentType(MediaType.APPLICATION_JSON)
// Remove this when Spring 5.0.8 is used
.accept(MediaType.valueOf("application/stream+json"))
.header("x-foo", "bar").body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getHeaders().getFirst("x-foo")).isEqualTo("bar");
assertThat(result.getHeaders()).doesNotContainKey("id");
@@ -277,10 +154,12 @@ public class RestApplicationTests {
public void headers() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/headers")).contentType(MediaType.APPLICATION_JSON)
// Remove this when Spring 5.0.8 is used
.accept(MediaType.valueOf("application/stream+json"))
.body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]");
assertThat(result.getHeaders().getFirst("foo")).isEqualTo("bar");
assertThat(result.getHeaders()).doesNotContainKey("id");
assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]");
}
@Test
@@ -290,7 +169,8 @@ public class RestApplicationTests {
RequestEntity.post(new URI("/uppercase"))
.contentType(MediaType.TEXT_PLAIN).body("foo"),
String.class);
assertThat(result.getBody()).isEqualTo("(FOO)");
// Result is multi-valued so it has to come out as an array
assertThat(result.getBody()).isEqualTo("[\"(FOO)\"]");
}
@Test
@@ -374,26 +254,12 @@ public class RestApplicationTests {
assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]");
}
@Test
public void postMoreFoo() {
assertThat(rest.getForObject("/post/more/foo", String.class)).isEqualTo("(FOO)");
}
@Test
public void uppercaseGet() {
assertThat(rest.getForObject("/uppercase/foo", String.class)).isEqualTo("(FOO)");
}
@Test
public void convertGet() {
assertThat(rest.getForObject("/wrap/123", String.class)).isEqualTo("..123..");
}
@Test
public void convertPost() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity.post(new URI("/wrap"))
.contentType(MediaType.TEXT_PLAIN).body("123"), String.class);
assertThat(result.getBody()).isEqualTo("..123..");
// Result is multi-valued so it has to come out as an array
assertThat(result.getBody()).isEqualTo("[\"..123..\"]");
}
@Test
@@ -408,20 +274,6 @@ public class RestApplicationTests {
assertThat(result.getBody()).isEqualTo("[246]");
}
@Test
public void supplierFirst() {
assertThat(rest.getForObject("/not/a/function", String.class))
.isEqualTo("[\"hello\"]");
}
@Test
public void convertGetJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/entity/321"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("{\"value\":321}");
}
@Test
public void uppercaseJsonArray() throws Exception {
assertThat(rest.exchange(
@@ -475,7 +327,7 @@ public class RestApplicationTests {
private List<String> list = new ArrayList<>();
public static void main(String[] args) throws Exception {
SpringApplication.run(RestApplicationTests.ApplicationConfiguration.class,
SpringApplication.run(HttpPostIntegrationTests.ApplicationConfiguration.class,
args);
}
@@ -525,12 +377,6 @@ public class RestApplicationTests {
return flux -> flux.log().map(value -> 2 * value);
}
@Bean
public Function<Flux<Integer>, Flux<Map<String, Object>>> entity() {
return flux -> flux.log()
.map(value -> Collections.singletonMap("value", value));
}
@Bean
public Function<Flux<HashMap<String, String>>, Flux<Map<String, String>>> maps() {
return flux -> flux.map(value -> {
@@ -539,32 +385,12 @@ public class RestApplicationTests {
});
}
@Bean({ "words", "get/more" })
public Supplier<Flux<String>> words() {
return () -> Flux.just("foo", "bar");
}
@Bean
public Supplier<String> word() {
return () -> "foo";
}
@Bean
public Supplier<Flux<Foo>> foos() {
return () -> Flux.just(new Foo("foo"), new Foo("bar"));
}
@Bean
@Qualifier("foos")
public Function<String, Foo> qualifier() {
return value -> new Foo("[" + value.trim().toUpperCase() + "]");
}
@Bean
public Supplier<List<String>> bareWords() {
return () -> Arrays.asList("foo", "bar");
}
@Bean
public Consumer<Flux<String>> updates() {
return flux -> flux.subscribe(value -> list.add(value));
@@ -580,44 +406,11 @@ public class RestApplicationTests {
return value -> list.add(value);
}
@Bean
public Supplier<Flux<String>> bang() {
return () -> Flux.fromArray(new String[] { "foo", "bar" }).map(value -> {
if (value.equals("bar")) {
throw new RuntimeException("Bar");
}
return value;
});
}
@Bean
public Supplier<Flux<String>> empty() {
return () -> Flux.fromIterable(Collections.emptyList());
}
@Bean("not/a/function")
public Supplier<Flux<String>> supplier() {
return () -> Flux.just("hello");
}
@Bean("not/a")
public Function<Flux<String>, Flux<String>> function() {
return input -> Flux.just("bye");
}
@Bean
public Supplier<Flux<String>> timeout() {
return () -> Flux.defer(() -> Flux.<String>create(emitter -> {
emitter.next("foo");
}).timeout(Duration.ofMillis(100L), Flux.empty()));
}
@Bean
public Supplier<Flux<List<String>>> sentences() {
return () -> Flux.just(Arrays.asList("go", "home"),
Arrays.asList("come", "back"));
}
@Bean
public Function<MultiValueMap<String, String>, Map<String, Integer>> sum() {
return valueMap -> valueMap.entrySet().stream()

View File

@@ -0,0 +1,83 @@
/*
* Copyright 2012-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.mvc;
import java.net.URI;
import java.util.function.Supplier;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.cloud.function.web.RestApplication;
import org.springframework.cloud.function.web.mvc.PrefixTests.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = {
"spring.main.web-application-type=servlet",
"spring.cloud.function.web.path=/functions" })
@ContextConfiguration(classes= {RestApplication.class, TestConfiguration.class})
public class PrefixTests {
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate rest;
@Test
public void words() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.get(new URI("/functions/words")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
public void missing() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/words")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND);
}
@EnableAutoConfiguration
@org.springframework.boot.test.context.TestConfiguration
protected static class TestConfiguration {
@Bean({ "words", "get/more" })
public Supplier<Flux<String>> words() {
return () -> Flux.fromArray(new String[] { "foo", "bar" });
}
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.function.web;
package org.springframework.cloud.function.web.mvc;
import java.net.URI;
import java.util.function.Supplier;
@@ -33,10 +33,13 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.cloud.function.web.RestApplication;
import org.springframework.cloud.function.web.mvc.SingletonTests.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
@@ -49,6 +52,7 @@ import reactor.core.publisher.Flux;
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@ContextConfiguration(classes= {RestApplication.class, TestConfiguration.class})
public class SingletonTests {
@LocalServerPort