Alternative approach to MVC handling

Doesn't rely on manipulating the FunctionCatalog, and does type
conversion/coercion in the MVC layer.
This commit is contained in:
Dave Syer
2017-04-24 13:25:21 +01:00
parent 5055369cb5
commit 4686e450b1
19 changed files with 1023 additions and 750 deletions

View File

@@ -1,323 +0,0 @@
/*
* Copyright 2012-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.springframework.http.HttpInputMessage;
import org.springframework.http.HttpOutputMessage;
import org.springframework.http.MediaType;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.HttpMessageNotReadableException;
import org.springframework.http.converter.HttpMessageNotWritableException;
import reactor.core.publisher.Flux;
/**
* Converter for request bodies of type <code>Flux<String></code>.
*
* @author Dave Syer
*
*/
public class FluxHttpMessageConverter implements HttpMessageConverter<Flux<Object>> {
private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream");
@Override
public boolean canRead(Class<?> clazz, MediaType mediaType) {
return Flux.class.isAssignableFrom(clazz);
}
@Override
public boolean canWrite(Class<?> clazz, MediaType mediaType) {
return false;
}
@Override
public List<MediaType> getSupportedMediaTypes() {
return Arrays.asList(MediaType.ALL);
}
@Override
public Flux<Object> read(Class<? extends Flux<Object>> clazz,
HttpInputMessage inputMessage)
throws IOException, HttpMessageNotReadableException {
MediaType mediaType = inputMessage.getHeaders().getContentType();
if (mediaType != null) {
if (!MediaType.ALL.equals(mediaType) && mediaType.includes(MediaType.APPLICATION_JSON)) {
return new JsonObjectDecoder().decode(inputMessage.getBody());
}
if (mediaType.includes(EVENT_STREAM)) {
return splitOnSseData(inputMessage);
}
}
return splitOnLineEndings(inputMessage);
}
private Flux<Object> splitOnLineEndings(HttpInputMessage inputMessage) {
return Flux.create(sink -> {
BufferedReader reader;
try {
reader = new BufferedReader(
new InputStreamReader(inputMessage.getBody()));
String line = reader.readLine();
while (line != null) {
sink.next(line);
line = reader.readLine();
}
}
catch (IOException e) {
sink.error(e);
}
sink.complete();
});
}
private Flux<Object> splitOnSseData(HttpInputMessage inputMessage) {
return Flux.create(sink -> {
BufferedReader reader;
StringBuffer buffer = new StringBuffer();
int emptyCount = 0;
try {
reader = new BufferedReader(
new InputStreamReader(inputMessage.getBody()));
String line = reader.readLine();
while (line != null) {
if (line.length() == 0) {
emptyCount++;
}
else {
if (buffer.length() == 0) {
if (line.startsWith("data:")) {
line = line.length() > "data:".length()
? line.substring("data:".length()) : "";
}
}
else {
buffer.append("\n");
}
buffer.append(line);
}
if (emptyCount > 0) {
sink.next(buffer.toString());
buffer.setLength(0);
emptyCount = 0;
while (line != null && line.length() == 0) {
line = reader.readLine();
}
}
else {
line = reader.readLine();
}
}
if (buffer.length()>0) {
sink.next(buffer.toString());
}
}
catch (IOException e) {
sink.error(e);
}
sink.complete();
});
}
@Override
public void write(Flux<Object> t, MediaType contentType,
HttpOutputMessage outputMessage)
throws IOException, HttpMessageNotWritableException {
}
static class JsonObjectDecoder {
private static final int ST_CORRUPTED = -1;
private static final int ST_INIT = 0;
private static final int ST_DECODING_NORMAL = 1;
private static final int ST_DECODING_ARRAY_STREAM = 2;
private final int maxObjectLength = 1024 * 1024;
private int openBraces;
private int state;
private boolean insideString;
private int writerIndex;
private boolean streamArrayElements = true;
public Flux<Object> decode(InputStream body) {
InputStreamReader reader = new InputStreamReader(body);
char[] buffer = new char[1024];
try {
List<String> chunks = new ArrayList<>();
int read = reader.read(buffer);
this.writerIndex += read;
while (read >= 0) {
if (this.state == ST_CORRUPTED) {
return Flux.error(new IllegalStateException("Corrupted stream"));
}
if (this.writerIndex > maxObjectLength) {
// buffer size exceeded maxObjectLength; discarding the complete
// buffer.
reset();
return Flux.error(new IllegalStateException(
"object length exceeds " + maxObjectLength + ": "
+ this.writerIndex + " bytes discarded"));
}
int point = 0;
for (int index = 0; index < read; index++) {
char c = buffer[index];
if (this.state == ST_DECODING_NORMAL) {
decodeByte(c, buffer, index);
// All opening braces/brackets have been closed. That's enough
// to conclude that the JSON object/array is complete.
if (this.openBraces == 0) {
char[] json = extractObject(buffer, point,
index + 1 - point);
if (json != null) {
chunks.add(new String(json));
}
// The JSON object/array was extracted => discard the
// bytes from the input buffer.
point += index + 1 - point;
// Reset the object state to get ready for the next JSON
// object/text coming along the byte stream.
reset();
}
}
else if (this.state == ST_DECODING_ARRAY_STREAM) {
decodeByte(c, buffer, index);
if (!this.insideString && (this.openBraces == 1 && c == ','
|| this.openBraces == 0 && c == ']')) {
// skip leading spaces. No range check is needed and the
// loop will terminate because the byte at position index
// is not a whitespace.
for (int i = point; Character
.isWhitespace(buffer[i]); i++) {
point++;
}
// skip trailing spaces.
int idxNoSpaces = index - 1;
while (idxNoSpaces >= 0
&& Character.isWhitespace(buffer[idxNoSpaces])) {
idxNoSpaces--;
}
char[] json = extractObject(buffer, point,
idxNoSpaces + 1 - point);
if (json != null) {
chunks.add(new String(json));
}
point += index + 1 - point;
if (c == ']') {
reset();
}
}
// JSON object/array detected. Accumulate bytes until all
// braces/brackets are closed.
}
else if (c == '{' || c == '[') {
initDecoding(c, this.streamArrayElements);
if (this.state == ST_DECODING_ARRAY_STREAM) {
// Discard the array bracket
point++;
}
// Discard leading spaces in front of a JSON object/array.
}
else if (Character.isWhitespace(c)) {
point++;
}
else {
this.state = ST_CORRUPTED;
return Flux.error(new IllegalStateException(
"invalid JSON received at byte position "
+ writerIndex));
}
}
read = reader.read(buffer);
}
return Flux.fromIterable(chunks);
}
catch (IOException e) {
return Flux.error(new IllegalStateException("Cannot read stream", e));
}
}
private char[] extractObject(char[] buffer, int index, int length) {
if (length <= 0) {
return null;
}
return Arrays.copyOfRange(buffer, index, index + length);
}
private void decodeByte(char c, char[] input, int index) {
if ((c == '{' || c == '[') && !this.insideString) {
this.openBraces++;
}
else if ((c == '}' || c == ']') && !this.insideString) {
this.openBraces--;
}
else if (c == '"') {
// start of a new JSON string. It's necessary to detect strings as they
// may also contain braces/brackets and that could lead to incorrect
// results.
if (!this.insideString) {
this.insideString = true;
// If the double quote wasn't escaped then this is the end of a
// string.
}
else if (input[index - 1] != '\\') {
this.insideString = false;
}
}
}
private void initDecoding(char openingBrace, boolean streamArrayElements) {
this.openBraces = 1;
if (openingBrace == '[' && streamArrayElements) {
this.state = ST_DECODING_ARRAY_STREAM;
}
else {
this.state = ST_DECODING_NORMAL;
}
}
private void reset() {
this.insideString = false;
this.state = ST_INIT;
this.openBraces = 0;
}
}
}

View File

@@ -14,15 +14,17 @@
* limitations under the License.
*/
package org.springframework.cloud.function.web;
package org.springframework.cloud.function.web.flux;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.function.context.FunctionInspector;
import org.springframework.cloud.function.support.FluxSupplier;
import org.springframework.cloud.function.support.FunctionUtils;
import org.springframework.cloud.function.web.flux.request.FluxRequest;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
@@ -42,25 +44,30 @@ import reactor.core.publisher.Mono;
*/
@Component
public class FunctionController {
private FunctionInspector inspector;
@Value("${debug:${DEBUG:false}}")
private boolean debug = false;
public FunctionController(FunctionInspector inspector) {
this.inspector = inspector;
}
@PostMapping(path = "/**")
@ResponseBody
public ResponseEntity<Flux<String>> post(
@RequestAttribute(required = false, name = "org.springframework.cloud.function.web.FunctionHandlerMapping.function") Function<Flux<?>, Flux<?>> function,
@RequestAttribute(required = false, name = "org.springframework.cloud.function.web.FunctionHandlerMapping.consumer") Consumer<Flux<?>> consumer,
@RequestBody Flux<String> body) {
public ResponseEntity<Flux<?>> post(
@RequestAttribute(required = false, name = "org.springframework.cloud.function.web.flux.FunctionHandlerMapping.function") Function<Flux<?>, Flux<?>> function,
@RequestAttribute(required = false, name = "org.springframework.cloud.function.web.flux.FunctionHandlerMapping.consumer") Consumer<Flux<?>> consumer,
@RequestBody FluxRequest<?> body) {
if (function != null) {
@SuppressWarnings("unchecked")
Flux<String> result = (Flux<String>) function.apply(body);
Flux<?> result = (Flux<?>) function.apply(body.flux());
return ResponseEntity.ok().body(debug ? result.log() : result);
}
if (consumer != null) {
body = body.cache(); // send a copy back to the caller
consumer.accept(body);
return ResponseEntity.status(HttpStatus.ACCEPTED).body(body);
Flux<?> flux = body.flux().cache(); // send a copy back to the caller
consumer.accept(flux);
return ResponseEntity.status(HttpStatus.ACCEPTED).body(flux);
}
throw new IllegalArgumentException("no such function");
}
@@ -68,9 +75,9 @@ public class FunctionController {
@GetMapping(path = "/**")
@ResponseBody
public Object get(
@RequestAttribute(required = false, name = "org.springframework.cloud.function.web.FunctionHandlerMapping.function") Function<Flux<?>, Flux<?>> function,
@RequestAttribute(required = false, name = "org.springframework.cloud.function.web.FunctionHandlerMapping.supplier") Supplier<Flux<?>> supplier,
@RequestAttribute(required = false, name = "org.springframework.cloud.function.web.FunctionHandlerMapping.argument") String argument) {
@RequestAttribute(required = false, name = "org.springframework.cloud.function.web.flux.FunctionHandlerMapping.function") Function<Flux<?>, Flux<?>> function,
@RequestAttribute(required = false, name = "org.springframework.cloud.function.web.flux.FunctionHandlerMapping.supplier") Supplier<Flux<?>> supplier,
@RequestAttribute(required = false, name = "org.springframework.cloud.function.web.flux.FunctionHandlerMapping.argument") String argument) {
if (function != null) {
return value(function, argument);
}
@@ -78,18 +85,18 @@ public class FunctionController {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private Flux<String> supplier(Supplier<Flux<?>> supplier) {
private Flux<?> supplier(Supplier<Flux<?>> supplier) {
if (!FunctionUtils.isFluxSupplier(supplier)) {
supplier = new FluxSupplier(supplier);
}
Flux<String> result = (Flux<String>) supplier.get();
Flux<?> result = supplier.get();
return debug ? result.log() : result;
}
private Mono<String> value(Function<Flux<?>, Flux<?>> function,
private Mono<?> value(Function<Flux<?>, Flux<?>> function,
@PathVariable String value) {
@SuppressWarnings({ "unchecked" })
Mono<String> result = Mono.from((Flux<String>) function.apply(Flux.just(value)));
Object input = inspector.convert(inspector.getName(function), value);
Mono<?> result = Mono.from(function.apply(Flux.just(input)));
return debug ? result.log() : result;
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.function.web;
package org.springframework.cloud.function.web.flux;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -26,7 +26,9 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.function.context.FunctionInspector;
import org.springframework.cloud.function.registry.FunctionCatalog;
import org.springframework.cloud.function.web.flux.request.FluxHandlerMethodArgumentResolver;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.HandlerMapping;
@@ -57,10 +59,10 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping
private String prefix = "";
@Autowired
public FunctionHandlerMapping(FunctionCatalog catalog) {
public FunctionHandlerMapping(FunctionCatalog catalog, FunctionInspector inspector) {
this.functions = catalog;
setOrder(super.getOrder() - 5);
this.controller = new FunctionController();
this.controller = new FunctionController(inspector);
}
@Override
@@ -87,10 +89,14 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping
if (path == null) {
return handler;
}
if (findFunctionForGet(request, path) != null) {
Object function = findFunctionForGet(request, path);
if (function != null) {
request.setAttribute(FluxHandlerMethodArgumentResolver.HANDLER, function);
return handler;
}
if (findFunctionForPost(request, path) != null) {
function = findFunctionForPost(request, path);
if (function != null) {
request.setAttribute(FluxHandlerMethodArgumentResolver.HANDLER, function);
return handler;
}
return null;

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.function.web;
package org.springframework.cloud.function.web.flux;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;

View File

@@ -16,20 +16,32 @@
package org.springframework.cloud.function.web.flux;
import java.util.ArrayList;
import java.util.List;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.boot.autoconfigure.web.HttpMessageConverters;
import org.springframework.cloud.function.context.FunctionInspector;
import org.springframework.cloud.function.registry.FunctionCatalog;
import org.springframework.cloud.function.web.flux.request.FluxHandlerMethodArgumentResolver;
import org.springframework.cloud.function.web.flux.response.FluxReturnValueHandler;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.http.converter.ObjectToStringHttpMessageConverter;
import org.springframework.util.ClassUtils;
import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler;
import org.springframework.web.method.support.HandlerMethodArgumentResolver;
import org.springframework.web.method.support.HandlerMethodReturnValueHandler;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter;
import reactor.core.publisher.Flux;
@@ -40,7 +52,7 @@ import reactor.core.publisher.Flux;
@Configuration
@ConditionalOnWebApplication
@ConditionalOnClass({ Flux.class, AsyncHandlerMethodReturnValueHandler.class })
public class ReactorAutoConfiguration extends WebMvcConfigurerAdapter {
public class ReactorAutoConfiguration {
@Autowired
private ApplicationContext context;
@@ -51,23 +63,59 @@ public class ReactorAutoConfiguration extends WebMvcConfigurerAdapter {
}
@Bean
public FluxReturnValueHandler fluxReturnValueHandler(
HttpMessageConverters converters) {
return new FluxReturnValueHandler(converters.getConverters());
public FunctionHandlerMapping functionHandlerMapping(FunctionCatalog catalog,
FunctionInspector inspector) {
return new FunctionHandlerMapping(catalog, inspector);
}
@Configuration
protected static class FluxMessageConverterConfiguration {
@ConditionalOnMissingClass("org.springframework.core.ReactiveAdapter")
protected static class FluxReturnValueConfiguration {
@Bean
public FluxHttpMessageConverter fluxHttpMessageConverter() {
return new FluxHttpMessageConverter();
public FluxReturnValueHandler fluxReturnValueHandler(
HttpMessageConverters converters) {
return new FluxReturnValueHandler(converters.getConverters());
}
}
@Override
public void addReturnValueHandlers(
List<HandlerMethodReturnValueHandler> returnValueHandlers) {
returnValueHandlers.add(context.getBean(FluxReturnValueHandler.class));
@Configuration
protected static class FluxArgumentResolverConfiguration {
@Bean
public FluxHandlerMethodArgumentResolver fluxHandlerMethodArgumentResolver(
FunctionInspector inspector, ObjectMapper mapper) {
return new FluxHandlerMethodArgumentResolver(inspector, mapper);
}
}
@Bean
public BeanPostProcessor fluxRequestMappingHandlerAdapterProcessor() {
return new BeanPostProcessor() {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
if (bean instanceof RequestMappingHandlerAdapter) {
RequestMappingHandlerAdapter adapter = (RequestMappingHandlerAdapter) bean;
List<HandlerMethodArgumentResolver> resolvers = new ArrayList<>(
adapter.getArgumentResolvers());
resolvers.add(0,
context.getBean(FluxHandlerMethodArgumentResolver.class));
adapter.setArgumentResolvers(resolvers);
if (!ClassUtils.isPresent("org.springframework.core.ReactiveAdapter",
null)) {
List<HandlerMethodReturnValueHandler> handlers = new ArrayList<>(
adapter.getReturnValueHandlers());
handlers.add(0, context.getBean(FluxReturnValueHandler.class));
adapter.setReturnValueHandlers(handlers);
}
}
return bean;
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
return bean;
}
};
}
}

View File

@@ -0,0 +1,46 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux.request;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.cloud.function.context.FunctionInspector;
public abstract class DelegateHandler<T> {
private final ListableBeanFactory factory;
private FunctionInspector processor;
private final Object source;
public DelegateHandler(ListableBeanFactory factory, Object source) {
this.factory = factory;
this.source = source;
}
public Class<?> type() {
String name = source instanceof String ? (String) source
: processor.getName(source);
return (Class<?>) processor().getInputType(name);
}
private FunctionInspector processor() {
if (processor == null) {
processor = factory.getBean(FunctionInspector.class);
}
return processor;
}
}

View File

@@ -0,0 +1,82 @@
/*
* Copyright 2012-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux.request;
import java.util.ArrayList;
import java.util.List;
import javax.servlet.http.HttpServletRequest;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cloud.function.context.FunctionInspector;
import org.springframework.core.MethodParameter;
import org.springframework.core.Ordered;
import org.springframework.web.bind.support.WebDataBinderFactory;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.method.support.HandlerMethodArgumentResolver;
import org.springframework.web.method.support.ModelAndViewContainer;
/**
* Converter for request bodies of type <code>Flux<String></code>.
*
* @author Dave Syer
*
*/
public class FluxHandlerMethodArgumentResolver
implements HandlerMethodArgumentResolver, Ordered {
public static final String HANDLER = FluxHandlerMethodArgumentResolver.class.getName()
+ ".HANDLER";
private final ObjectMapper mapper;
private FunctionInspector inspector;
public FluxHandlerMethodArgumentResolver(FunctionInspector inspector,
ObjectMapper mapper) {
this.inspector = inspector;
this.mapper = mapper;
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
@Override
public Object resolveArgument(MethodParameter parameter,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest,
WebDataBinderFactory binderFactory) throws Exception {
Object handler = webRequest.getAttribute(HANDLER, NativeWebRequest.SCOPE_REQUEST);
Class<?> type = inspector.getInputType(inspector.getName(handler));
if (type == null) {
type = Object.class;
}
List<Object> body = mapper.readValue(
webRequest.getNativeRequest(HttpServletRequest.class).getInputStream(),
mapper.getTypeFactory().constructCollectionLikeType(ArrayList.class,
type));
return new FluxRequest<Object>(body);
}
@Override
public boolean supportsParameter(MethodParameter parameter) {
return FluxRequest.class.isAssignableFrom(parameter.getParameterType());
}
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux.request;
import java.util.List;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
*
*/
public class FluxRequest<T> {
private List<T> body;
public FluxRequest(List<T> body) {
this.body = body;
}
public Flux<T> flux() {
return Flux.fromIterable(body);
}
public List<T> body() {
return body;
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux;
package org.springframework.cloud.function.web.flux.response;
import org.reactivestreams.Publisher;
@@ -41,7 +41,8 @@ class FluxResponseBodyEmitter<T> extends ResponseBodyEmitter {
public FluxResponseBodyEmitter(MediaType mediaType, Publisher<T> observable) {
super();
this.mediaType = mediaType;
new ResponseBodyEmitterSubscriber<>(mediaType, observable, this);
new ResponseBodyEmitterSubscriber<>(mediaType, observable, this,
MediaType.APPLICATION_JSON.isCompatibleWith(mediaType));
}
@Override

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux;
package org.springframework.cloud.function.web.flux.response;
import org.reactivestreams.Publisher;
@@ -33,12 +33,12 @@ import reactor.core.publisher.Flux;
class FluxResponseSseEmitter<T> extends SseEmitter {
public FluxResponseSseEmitter(Publisher<T> observable) {
this(MediaType.valueOf("text/event-stream"), observable);
this(MediaType.valueOf("text/plain"), observable);
}
public FluxResponseSseEmitter(MediaType mediaType, Publisher<T> observable) {
super();
new ResponseBodyEmitterSubscriber<>(mediaType, observable, this);
new ResponseBodyEmitterSubscriber<>(mediaType, observable, this, false);
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux;
package org.springframework.cloud.function.web.flux.response;
import java.time.Duration;
import java.util.List;
@@ -65,13 +65,19 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand
@Override
public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) {
return returnValue != null && supportsReturnType(returnType);
if (returnValue != null) {
return supportsReturnType(returnType);
}
return false;
}
@Override
public boolean supportsReturnType(MethodParameter returnType) {
return Publisher.class.isAssignableFrom(returnType.getParameterType())
|| isResponseEntity(returnType);
return (returnType.getParameterType() != null
&& (Publisher.class.isAssignableFrom(returnType.getParameterType())
|| isResponseEntity(returnType)))
|| Publisher.class
.isAssignableFrom(returnType.getMethod().getReturnType());
}
private boolean isResponseEntity(MethodParameter returnType) {
@@ -87,6 +93,12 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand
public void handleReturnValue(Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest)
throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
Object adaptFrom = returnValue;
if (returnValue instanceof ResponseEntity) {
ResponseEntity<?> value = (ResponseEntity<?>) returnValue;
@@ -96,9 +108,19 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand
}
Publisher<?> flux = (Publisher<?>) adaptFrom;
MediaType mediaType = webRequest.getHeader("Accept") == null ? null
: MediaType.parseMediaTypes(webRequest.getHeader("Accept")).iterator()
.next();
MediaType mediaType = null;
if (webRequest.getHeader("Accept") != null) {
for (MediaType type : MediaType
.parseMediaTypes(webRequest.getHeader("Accept"))) {
if (!MediaType.ALL.equals(type)
&& MediaType.APPLICATION_JSON.isCompatibleWith(type)) {
mediaType = MediaType.APPLICATION_JSON;
break;
} else if (mediaType==null) {
mediaType = type;
}
}
}
delegate.handleReturnValue(getEmitter(timeout, flux, mediaType), returnType,
mavContainer, webRequest);
}
@@ -109,7 +131,8 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand
: Flux.from(flux).timeout(Duration.ofMillis(timeout), Flux.empty());
if (!MediaType.ALL.equals(mediaType)
&& EVENT_STREAM.isCompatibleWith(mediaType)) {
return new FluxResponseSseEmitter<>(mediaType, exported);
// TODO: more subtle content negotiation
return new FluxResponseSseEmitter<>(MediaType.APPLICATION_JSON, exported);
}
return new FluxResponseBodyEmitter<>(mediaType, exported);
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux;
package org.springframework.cloud.function.web.flux.response;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@@ -49,11 +49,14 @@ class ResponseBodyEmitterSubscriber<T> implements Subscriber<T> {
private boolean single;
private boolean json;
public ResponseBodyEmitterSubscriber(MediaType mediaType, Publisher<T> observable,
ResponseBodyEmitter responseBodyEmitter) {
ResponseBodyEmitter responseBodyEmitter, boolean json) {
this.mediaType = mediaType;
this.responseBodyEmitter = responseBodyEmitter;
this.json = json;
this.responseBodyEmitter.onTimeout(new Timeout());
this.responseBodyEmitter.onCompletion(new Complete());
this.single = observable instanceof Mono;
@@ -72,8 +75,7 @@ class ResponseBodyEmitterSubscriber<T> implements Subscriber<T> {
Object object = value;
try {
if (!MediaType.ALL.equals(mediaType)
&& MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
if (isJson()) {
if (!this.firstElementWritten) {
if (!single) {
responseBodyEmitter.send("[");
@@ -83,7 +85,7 @@ class ResponseBodyEmitterSubscriber<T> implements Subscriber<T> {
else {
responseBodyEmitter.send(",");
}
if (value.getClass() == String.class
if (!single && value.getClass() == String.class
&& !((String) value).contains("\"")) {
object = "\"" + value + "\"";
}
@@ -104,8 +106,7 @@ class ResponseBodyEmitterSubscriber<T> implements Subscriber<T> {
if (!completed) {
completed = true;
try {
if (!MediaType.ALL.equals(mediaType)
&& MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
if (isJson()) {
if (!single) {
if (!this.firstElementWritten) {
responseBodyEmitter.send("[]");
@@ -133,8 +134,7 @@ class ResponseBodyEmitterSubscriber<T> implements Subscriber<T> {
if (!completed) {
completed = true;
try {
if (!MediaType.ALL.equals(mediaType)
&& MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
if (isJson()) {
if (!single) {
if (!this.firstElementWritten) {
responseBodyEmitter.send("[");
@@ -150,6 +150,10 @@ class ResponseBodyEmitterSubscriber<T> implements Subscriber<T> {
}
}
private boolean isJson() {
return json;
}
class Complete implements Runnable {
@Override

View File

@@ -1,3 +1,2 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.function.web.FunctionHandlerMapping,\
org.springframework.cloud.function.web.flux.ReactorAutoConfiguration

View File

@@ -0,0 +1,397 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.mvc;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.embedded.LocalServerPort;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.cloud.function.mvc.MvcRestApplicationTests.TestConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import static org.assertj.core.api.Assertions.assertThat;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Tests for vanilla MVC handling (no function layer). Validates the MVC customizations
* that are added in this project independently of the specific concerns of function.
*
* @author Dave Syer
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = TestConfiguration.class, webEnvironment = WebEnvironment.RANDOM_PORT)
public class MvcRestApplicationTests {
private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream");
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate rest;
@Autowired
private TestConfiguration test;
@Before
public void init() {
test.list.clear();
}
@Test
public void wordsSSE() throws Exception {
assertThat(rest.exchange(
RequestEntity.get(new URI("/words")).accept(EVENT_STREAM).build(),
String.class).getBody()).isEqualTo(sse("foo", "bar"));
}
@Test
public void wordsJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/words"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
@Ignore("Fix error handling")
public void errorJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/bang"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[\"foo\"]");
}
@Test
public void words() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/words")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
public void foos() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/foos")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]");
}
@Test
public void getMore() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/get/more")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
@Ignore("Should this even work? Or do we need to be explicit about the JSON?")
public void updates() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.post(new URI("/updates")).body("one\ntwo"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody()).isEqualTo("onetwo");
}
@Test
public void updatesJson() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/updates")).contentType(MediaType.APPLICATION_JSON)
.body("[\"one\",\"two\"]"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody()).isEqualTo("[\"one\",\"two\"]");
}
@Test
public void addFoos() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/addFoos")).contentType(MediaType.APPLICATION_JSON)
.body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]");
}
@Test
public void timeout() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/timeout")).build(), String.class)
.getBody()).isEqualTo("[\"foo\"]");
}
@Test
public void emptyJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/empty"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[]");
}
@Test
public void sentences() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/sentences")).build(), String.class)
.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
}
@Test
public void sentencesAcceptAny() throws Exception {
assertThat(rest.exchange(
RequestEntity.get(new URI("/sentences")).accept(MediaType.ALL).build(),
String.class).getBody())
.isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
}
@Test
public void sentencesAcceptJson() throws Exception {
ResponseEntity<String> result = rest
.exchange(
RequestEntity.get(new URI("/sentences"))
.accept(MediaType.APPLICATION_JSON).build(),
String.class);
assertThat(result.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
assertThat(result.getHeaders().getContentType())
.isGreaterThanOrEqualTo(MediaType.APPLICATION_JSON);
}
@Test
public void uppercase() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]");
}
@Test
public void uppercaseFoos() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/upFoos")).contentType(MediaType.APPLICATION_JSON)
.body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]");
}
@Test
public void transform() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/transform")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]");
}
@Test
public void postMore() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/post/more")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]");
}
@Test
public void uppercaseGet() {
assertThat(rest.getForObject("/uppercase/foo", String.class)).isEqualTo("[FOO]");
}
@Test
public void convertGet() {
assertThat(rest.getForObject("/wrap/123", String.class)).isEqualTo("..123..");
}
@Test
public void convertGetJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/entity/321"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("{\"value\":321}");
}
@Test
public void uppercaseJsonStream() throws Exception {
assertThat(rest
.exchange(RequestEntity.post(new URI("/maps"))
.contentType(MediaType.APPLICATION_JSON)
.body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class)
.getBody()).isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]");
}
@Test
public void uppercaseSSE() throws Exception {
assertThat(rest.exchange(RequestEntity.post(new URI("/uppercase"))
.accept(EVENT_STREAM).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class).getBody())
.isEqualTo(sse("[FOO]", "[BAR]"));
}
private String sse(String... values) {
return "data:" + StringUtils.arrayToDelimitedString(values, "\n\ndata:") + "\n\n";
}
@EnableAutoConfiguration
@RestController
@Configuration
public static class TestConfiguration {
private List<String> list = new ArrayList<>();
@PostMapping({ "/uppercase", "/transform", "/post/more" })
public Flux<?> uppercase(@RequestBody List<String> flux) {
return Flux.fromIterable(flux).log()
.map(value -> "[" + value.trim().toUpperCase() + "]");
}
@PostMapping("/upFoos")
public Flux<Foo> upFoos(@RequestBody List<Foo> list) {
return Flux.fromIterable(list).log()
.map(value -> new Foo(value.getValue().trim().toUpperCase()));
}
@GetMapping("/uppercase/{id}")
public Mono<?> uppercaseGet(@PathVariable String id) {
return Mono.just(id).map(value -> "[" + value.trim().toUpperCase() + "]");
}
@PostMapping("/wrap")
public Flux<?> wrap(@RequestBody Flux<Integer> flux) {
return flux.log().map(value -> ".." + value + "..");
}
@GetMapping("/wrap/{id}")
public Mono<?> wrapGet(@PathVariable int id) {
return Mono.just(id).log().map(value -> ".." + value + "..");
}
@GetMapping("/entity/{id}")
public Mono<Map<String, Object>> entity(@PathVariable Integer id) {
return Mono.just(id).log()
.map(value -> Collections.singletonMap("value", value));
}
@PostMapping("/maps")
public Flux<Map<String, String>> maps(
@RequestBody List<Map<String, String>> flux) {
return Flux.fromIterable(flux).map(value -> {
value.put("value", value.get("value").trim().toUpperCase());
return value;
});
}
@GetMapping({ "/words", "/get/more" })
public Flux<Object> words() {
return Flux.fromArray(new String[] { "foo", "bar" });
}
@GetMapping("/foos")
public Flux<Foo> foos() {
return Flux.just(new Foo("foo"), new Foo("bar"));
}
@PostMapping("/updates")
@ResponseStatus(HttpStatus.ACCEPTED)
public Flux<?> updates(@RequestBody List<String> list) {
Flux<String> flux = Flux.fromIterable(list).cache();
flux.subscribe(value -> this.list.add(value));
return flux;
}
@PostMapping("/addFoos")
@ResponseStatus(HttpStatus.ACCEPTED)
public Flux<Foo> addFoos(@RequestBody List<Foo> list) {
Flux<Foo> flux = Flux.fromIterable(list).cache();
flux.subscribe(value -> this.list.add(value.getValue()));
return flux;
}
@GetMapping("/bang")
public Flux<?> bang() {
return Flux.fromArray(new String[] { "foo", "bar" }).map(value -> {
if (value.equals("bar")) {
throw new RuntimeException("Bar");
}
return value;
});
}
@GetMapping("/empty")
public Flux<?> empty() {
return Flux.fromIterable(Collections.emptyList());
}
@GetMapping("/timeout")
public Flux<?> timeout() {
return Flux.defer(() -> Flux.<String>create(emitter -> {
emitter.next("foo");
}).timeout(Duration.ofMillis(100L), Flux.empty()));
}
@GetMapping("/sentences")
public Flux<List<String>> sentences() {
return Flux.just(Arrays.asList("go", "home"), Arrays.asList("come", "back"));
}
}
public static class Foo {
private String value;
public Foo(String value) {
this.value = value;
}
Foo() {
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
}

View File

@@ -56,7 +56,7 @@ public class PrefixTests {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/functions/words")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("foobar");
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@EnableAutoConfiguration

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.function.web;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -32,6 +33,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.embedded.LocalServerPort;
import org.springframework.boot.test.context.SpringBootTest;
@@ -57,7 +59,7 @@ import reactor.core.publisher.Flux;
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
public class RestApplicationTests {
private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream");
private static final MediaType EVENT_STREAM = MediaType.TEXT_EVENT_STREAM;
@LocalServerPort
private int port;
@Autowired
@@ -104,7 +106,26 @@ public class RestApplicationTests {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/words")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("foobar");
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
public void foos() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/foos")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]");
}
@Test
public void qualifierFoos() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/foos")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"[FOO]\"},{\"value\":\"[BAR]\"}]");
}
@Test
@@ -112,7 +133,7 @@ public class RestApplicationTests {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/get/more")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("foobar");
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
@@ -120,10 +141,11 @@ public class RestApplicationTests {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/bareWords")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("foobar");
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
@Ignore("Should this even work? Or do we need to be explicit about the JSON?")
public void updates() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.post(new URI("/updates")).body("one\ntwo"), String.class);
@@ -133,13 +155,34 @@ public class RestApplicationTests {
}
@Test
public void bareUpdates() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.post(new URI("/bareUpdates")).body("one\ntwo"),
String.class);
public void updatesJson() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/updates")).contentType(MediaType.APPLICATION_JSON)
.body("[\"one\",\"two\"]"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody()).isEqualTo("onetwo");
assertThat(result.getBody()).isEqualTo("[\"one\",\"two\"]");
}
@Test
public void addFoos() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/addFoos")).contentType(MediaType.APPLICATION_JSON)
.body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]");
}
@Test
public void bareUpdates() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/bareUpdates")).contentType(MediaType.APPLICATION_JSON)
.body("[\"one\",\"two\"]"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody()).isEqualTo("[\"one\",\"two\"]");
}
@Test
@@ -162,7 +205,7 @@ public class RestApplicationTests {
public void sentences() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/sentences")).build(), String.class)
.getBody()).isEqualTo("[\"go\",\"home\"][\"come\",\"back\"]");
.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
}
@Test
@@ -170,7 +213,7 @@ public class RestApplicationTests {
assertThat(rest.exchange(
RequestEntity.get(new URI("/sentences")).accept(MediaType.ALL).build(),
String.class).getBody())
.isEqualTo("[\"go\",\"home\"][\"come\",\"back\"]");
.isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
}
@Test
@@ -182,7 +225,7 @@ public class RestApplicationTests {
String.class);
assertThat(result.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
assertThat(result.getHeaders().getContentType())
.isEqualTo(MediaType.APPLICATION_JSON);
.isGreaterThanOrEqualTo(MediaType.APPLICATION_JSON);
}
@Test
@@ -197,27 +240,46 @@ public class RestApplicationTests {
}
@Test
public void uppercase() {
assertThat(rest.postForObject("/uppercase", "foo\nbar", String.class))
.isEqualTo("[FOO][BAR]");
public void uppercase() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]");
}
@Test
public void bareUppercase() {
assertThat(rest.postForObject("/bareUppercase", "foo\nbar", String.class))
.isEqualTo("[FOO][BAR]");
public void uppercaseFoos() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
// TODO: does not require a content type header, but the plain MVC version
// does
.post(new URI("/upFoos")).contentType(MediaType.APPLICATION_JSON)
.body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]");
}
@Test
public void transform() {
assertThat(rest.postForObject("/transform", "foo\nbar", String.class))
.isEqualTo("[FOO][BAR]");
public void bareUppercase() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/bareUppercase")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]");
}
@Test
public void postMore() {
assertThat(rest.postForObject("/post/more", "foo\nbar", String.class))
.isEqualTo("[FOO][BAR]");
public void transform() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/transform")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]");
}
@Test
public void postMore() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/post/more")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]");
}
@Test
@@ -237,7 +299,8 @@ public class RestApplicationTests {
@Test
public void supplierFirst() {
assertThat(rest.getForObject("/not/a/function", String.class)).isEqualTo("hello");
assertThat(rest.getForObject("/not/a/function", String.class))
.isEqualTo("[\"hello\"]");
}
@Test
@@ -256,26 +319,15 @@ public class RestApplicationTests {
// The new line in the middle is optional
.body("[{\"value\":\"foo\"},\n{\"value\":\"bar\"}]"),
String.class).getBody())
.isEqualTo("{\"value\":\"FOO\"}{\"value\":\"BAR\"}");
}
@Test
public void uppercaseJsonStream() throws Exception {
assertThat(rest
.exchange(RequestEntity.post(new URI("/maps"))
.contentType(MediaType.APPLICATION_JSON)
// TODO: make this work without newline separator
.body("{\"value\":\"foo\"}\n{\"value\":\"bar\"}"), String.class)
.getBody()).isEqualTo("{\"value\":\"FOO\"}{\"value\":\"BAR\"}");
.isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]");
}
@Test
public void uppercaseSSE() throws Exception {
assertThat(
rest.exchange(
RequestEntity.post(new URI("/uppercase")).accept(EVENT_STREAM)
.contentType(EVENT_STREAM).body(sse("foo", "bar")),
String.class).getBody()).isEqualTo(sse("[FOO]", "[BAR]"));
assertThat(rest.exchange(RequestEntity.post(new URI("/uppercase"))
.accept(EVENT_STREAM).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class).getBody())
.isEqualTo(sse("[FOO]", "[BAR]"));
}
private String sse(String... values) {
@@ -299,6 +351,12 @@ public class RestApplicationTests {
return value -> "[" + value.trim().toUpperCase() + "]";
}
@Bean
public Function<Flux<Foo>, Flux<Foo>> upFoos() {
return flux -> flux.log()
.map(value -> new Foo(value.getValue().trim().toUpperCase()));
}
@Bean
public Function<Flux<Integer>, Flux<String>> wrap() {
return flux -> flux.log().map(value -> ".." + value + "..");
@@ -320,7 +378,18 @@ public class RestApplicationTests {
@Bean({ "words", "get/more" })
public Supplier<Flux<String>> words() {
return () -> Flux.fromArray(new String[] { "foo", "bar" });
return () -> Flux.just("foo", "bar");
}
@Bean
public Supplier<Flux<Foo>> foos() {
return () -> Flux.just(new Foo("foo"), new Foo("bar"));
}
@Bean
@Qualifier("foos")
public Function<String, Foo> qualifier() {
return value -> new Foo("[" + value.trim().toUpperCase() + "]");
}
@Bean
@@ -333,6 +402,11 @@ public class RestApplicationTests {
return flux -> flux.subscribe(value -> list.add(value));
}
@Bean
public Consumer<Flux<Foo>> addFoos() {
return flux -> flux.subscribe(value -> list.add(value.getValue()));
}
@Bean
public Consumer<String> bareUpdates() {
return value -> list.add(value);
@@ -365,9 +439,9 @@ public class RestApplicationTests {
@Bean
public Supplier<Flux<String>> timeout() {
return () -> Flux.create(emitter -> {
return () -> Flux.defer(() -> Flux.<String>create(emitter -> {
emitter.next("foo");
});
}).timeout(Duration.ofMillis(100L), Flux.empty()));
}
@Bean
@@ -378,4 +452,23 @@ public class RestApplicationTests {
}
public static class Foo {
private String value;
public Foo(String value) {
this.value = value;
}
Foo() {
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
}

View File

@@ -1,108 +0,0 @@
/*
* Copyright 2012-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux;
import org.junit.Test;
import org.springframework.http.MediaType;
import org.springframework.mock.http.MockHttpInputMessage;
import static org.assertj.core.api.Assertions.assertThat;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
*
*/
public class FluxHttpMessageConverterTests {
private FluxHttpMessageConverter converter = new FluxHttpMessageConverter();
private Class<Flux<Object>> type = null;
@Test
public void newlines() throws Exception {
MockHttpInputMessage message = new MockHttpInputMessage("foo\nbar".getBytes());
assertThat(converter.read(type, message).collectList().block()).contains("foo",
"bar");
}
@Test
public void sse() throws Exception {
MockHttpInputMessage message = new MockHttpInputMessage(
"data:foo\n\ndata:bar".getBytes());
message.getHeaders().setContentType(MediaType.valueOf("text/event-stream"));
assertThat(converter.read(type, message).collectList().block()).contains("foo",
"bar");
}
@Test
public void jsonStream() throws Exception {
MockHttpInputMessage message = new MockHttpInputMessage(
"{\"value\":\"foo\"}{\"value\":\"barrier\"}".getBytes());
message.getHeaders().setContentType(MediaType.APPLICATION_JSON);
assertThat(converter.read(type, message).collectList().block())
.contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}");
}
@Test
public void jsonStreamWhitespace() throws Exception {
MockHttpInputMessage message = new MockHttpInputMessage(
"{\"value\":\"foo\"} {\"value\":\"barrier\"} ".getBytes());
message.getHeaders().setContentType(MediaType.APPLICATION_JSON);
assertThat(converter.read(type, message).collectList().block())
.contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}");
}
@Test
public void jsonStreamNewline() throws Exception {
MockHttpInputMessage message = new MockHttpInputMessage(
"{\"value\":\"foo\"}\n{\"value\":\"barrier\"}".getBytes());
message.getHeaders().setContentType(MediaType.APPLICATION_JSON);
assertThat(converter.read(type, message).collectList().block())
.contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}");
}
@Test
public void jsonArray() throws Exception {
MockHttpInputMessage message = new MockHttpInputMessage(
"[{\"value\":\"foo\"},{\"value\":\"barrier\"}]".getBytes());
message.getHeaders().setContentType(MediaType.APPLICATION_JSON);
assertThat(converter.read(type, message).collectList().block())
.contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}");
}
@Test
public void jsonArrayWhitespace() throws Exception {
MockHttpInputMessage message = new MockHttpInputMessage(
"[{\"value\":\"foo\"}, {\"value\":\"barrier\"}] ".getBytes());
message.getHeaders().setContentType(MediaType.APPLICATION_JSON);
assertThat(converter.read(type, message).collectList().block())
.contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}");
}
@Test
public void jsonArrayNewline() throws Exception {
MockHttpInputMessage message = new MockHttpInputMessage(
"[{\"value\":\"foo\"},\n{\"value\":\"barrier\"}]".getBytes());
message.getHeaders().setContentType(MediaType.APPLICATION_JSON);
assertThat(converter.read(type, message).collectList().block())
.contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}");
}
}