From 4686e450b1578936af6ce06af8a13131ff54421e Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Mon, 24 Apr 2017 13:25:21 +0100 Subject: [PATCH] Alternative approach to MVC handling Doesn't rely on manipulating the FunctionCatalog, and does type conversion/coercion in the MVC layer. --- ...ntextFunctionCatalogAutoConfiguration.java | 348 ++++++--------- .../function/context/FunctionInspector.java | 33 ++ .../web/flux/FluxHttpMessageConverter.java | 323 -------------- .../web/{ => flux}/FunctionController.java | 43 +- .../{ => flux}/FunctionHandlerMapping.java | 16 +- .../web/{ => flux}/FunctionMapping.java | 2 +- .../web/flux/ReactorAutoConfiguration.java | 74 +++- .../web/flux/request/DelegateHandler.java | 46 ++ .../FluxHandlerMethodArgumentResolver.java | 82 ++++ .../web/flux/request/FluxRequest.java | 45 ++ .../FluxResponseBodyEmitter.java | 5 +- .../FluxResponseSseEmitter.java | 6 +- .../FluxReturnValueHandler.java | 39 +- .../ResponseBodyEmitterSubscriber.java | 22 +- .../main/resources/META-INF/spring.factories | 1 - .../function/mvc/MvcRestApplicationTests.java | 397 ++++++++++++++++++ .../cloud/function/web/PrefixTests.java | 2 +- .../function/web/RestApplicationTests.java | 181 ++++++-- .../flux/FluxHttpMessageConverterTests.java | 108 ----- 19 files changed, 1023 insertions(+), 750 deletions(-) create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionInspector.java delete mode 100644 spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverter.java rename spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/{ => flux}/FunctionController.java (65%) rename spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/{ => flux}/FunctionHandlerMapping.java (86%) rename spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/{ => flux}/FunctionMapping.java (95%) create mode 100644 spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/DelegateHandler.java create mode 100644 spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxHandlerMethodArgumentResolver.java create mode 100644 spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxRequest.java rename spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/{ => response}/FluxResponseBodyEmitter.java (93%) rename spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/{ => response}/FluxResponseSseEmitter.java (91%) rename spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/{ => response}/FluxReturnValueHandler.java (79%) rename spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/{ => response}/ResponseBodyEmitterSubscriber.java (88%) create mode 100644 spring-cloud-function-web/src/test/java/org/springframework/cloud/function/mvc/MvcRestApplicationTests.java delete mode 100644 spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverterTests.java diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java index 5b9a6d12f..90fc4fd11 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java @@ -31,14 +31,11 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; -import com.fasterxml.jackson.databind.ObjectMapper; -import reactor.core.publisher.Flux; - import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.BeanDefinition; -import org.springframework.beans.factory.config.BeanFactoryPostProcessor; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionRegistry; @@ -54,12 +51,16 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.ResolvableType; import org.springframework.core.annotation.AnnotatedElementUtils; +import org.springframework.core.convert.ConversionService; +import org.springframework.core.convert.support.DefaultConversionService; import org.springframework.core.io.FileSystemResource; import org.springframework.core.type.StandardMethodMetadata; import org.springframework.stereotype.Component; import org.springframework.util.ClassUtils; import org.springframework.util.ReflectionUtils; +import reactor.core.publisher.Flux; + /** * @author Dave Syer * @author Mark Fisher @@ -82,31 +83,83 @@ public class ContextFunctionCatalogAutoConfiguration { private Map> registrations = Collections.emptyMap(); @Bean - public FunctionCatalog functionCatalog(ContextFunctionPostProcessor processor, - ObjectMapper mapper) { + public FunctionCatalog functionCatalog(ContextFunctionPostProcessor processor) { return new InMemoryFunctionCatalog( - processor.merge(registrations, consumers, suppliers, functions, mapper)); + processor.merge(registrations, consumers, suppliers, functions)); + } + + @Bean + public FunctionInspector functionInspector(ContextFunctionPostProcessor processor) { + return new BeanFactoryFunctionInspector(processor); + } + + protected class BeanFactoryFunctionInspector implements FunctionInspector { + + private ContextFunctionPostProcessor processor; + + public BeanFactoryFunctionInspector(ContextFunctionPostProcessor processor) { + this.processor = processor; + } + + @Override + public Class getInputType(String name) { + return processor.findInputType(name); + } + + @Override + public Class getOutputType(String name) { + return processor.findOutputType(name); + } + + @Override + public Object convert(String name, String value) { + return processor.convert(name, value); + } + + @Override + public String getName(Object function) { + return processor.registrations.get(function); + } + } @Component - public static class ContextFunctionPostProcessor - implements BeanFactoryPostProcessor, BeanDefinitionRegistryPostProcessor { + protected static class ContextFunctionPostProcessor + implements BeanDefinitionRegistryPostProcessor { private Set suppliers = new HashSet<>(); private Set functions = new HashSet<>(); private Set consumers = new HashSet<>(); private BeanDefinitionRegistry registry; + private ConversionService conversionService; + private Map registrations = new HashMap<>(); @Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) { this.registry = registry; } + public Object convert(String name, String value) { + if (conversionService==null) { + if (registry instanceof ConfigurableListableBeanFactory) { + ConversionService conversionService = ((ConfigurableBeanFactory) this.registry).getConversionService(); + if (conversionService != null) { + this.conversionService = conversionService; + } + else { + this.conversionService = new DefaultConversionService(); + } + } + } + Class type = findInputType(name); + return conversionService.canConvert(String.class, type) ? conversionService.convert(value, type) : value; + } + public Set> merge( Map> initial, Map> consumers, Map> suppliers, - Map> functions, ObjectMapper mapper) { + Map> functions) { Set> registrations = new HashSet<>(); Map targets = new HashMap<>(); // Replace the initial registrations with new ones that have the right names @@ -150,7 +203,7 @@ public class ContextFunctionCatalogAutoConfiguration { @SuppressWarnings("unchecked") FunctionRegistration target = (FunctionRegistration) registration; String key = targets.get(target.getTarget()); - wrap(target, mapper, key); + wrap(target, key); } return registrations; } @@ -168,18 +221,18 @@ public class ContextFunctionCatalogAutoConfiguration { return names; } - private void wrap(FunctionRegistration registration, - ObjectMapper mapper, String key) { + private void wrap(FunctionRegistration registration, String key) { Object target = registration.getTarget(); if (target instanceof Supplier) { - registration.target(target((Supplier) target, mapper, key)); + registration.target(target((Supplier) target, key)); } else if (target instanceof Consumer) { - registration.target(target((Consumer) target, mapper, key)); + registration.target(target((Consumer) target, key)); } else if (target instanceof Function) { - registration.target(target((Function) target, mapper, key)); + registration.target(target((Function) target, key)); } + this.registrations.put(registration.getTarget(), key); } private String getQualifier(String key) { @@ -200,49 +253,48 @@ public class ContextFunctionCatalogAutoConfiguration { return value; } - private Supplier target(Supplier target, ObjectMapper mapper, String key) { + private Supplier target(Supplier target, String key) { if (this.suppliers.contains(key)) { @SuppressWarnings("unchecked") Supplier> supplier = (Supplier>) target; - return wrapSupplier(supplier, mapper, key); + return supplier; } else if (!isFluxSupplier(key, target)) { @SuppressWarnings({ "unchecked", "rawtypes" }) FluxSupplier value = new FluxSupplier(target); - return wrapSupplier(value, mapper, key); + return value; } else { return target; } } - private Function target(Function target, ObjectMapper mapper, - String key) { + private Function target(Function target, String key) { if (this.functions.contains(key)) { @SuppressWarnings("unchecked") Function, Flux> function = (Function, Flux>) target; - return wrapFunction(function, mapper, key); + return function; } else if (!isFluxFunction(key, target)) { @SuppressWarnings({ "unchecked", "rawtypes" }) FluxFunction value = new FluxFunction(target); - return wrapFunction(value, mapper, key); + return value; } else { return target; } } - private Consumer target(Consumer target, ObjectMapper mapper, String key) { + private Consumer target(Consumer target, String key) { if (this.consumers.contains(key)) { @SuppressWarnings("unchecked") Consumer> consumer = (Consumer>) target; - return wrapConsumer(consumer, mapper, key); + return consumer; } else if (!isFluxConsumer(key, target)) { @SuppressWarnings({ "unchecked", "rawtypes" }) FluxConsumer value = new FluxConsumer(target); - return wrapConsumer(value, mapper, key); + return value; } else { return target; @@ -342,201 +394,69 @@ public class ContextFunctionCatalogAutoConfiguration { String.class))); } - private ProxySupplier wrapSupplier(Supplier> supplier, - ObjectMapper mapper, String name) { - ProxySupplier wrapped = new ProxySupplier(mapper); - wrapped.setDelegate(supplier); - wrapped.setOutputType(findType(name)); - return wrapped; - } - - private ProxyFunction wrapFunction(Function, Flux> function, - ObjectMapper mapper, String name) { - ProxyFunction wrapped = new ProxyFunction(mapper); - wrapped.setDelegate(function); - wrapped.setInputType(findType(name)); - wrapped.setOutputType(findOutputType(name)); - return wrapped; - } - - private ProxyConsumer wrapConsumer(Consumer> consumer, - ObjectMapper mapper, String name) { - ProxyConsumer wrapped = new ProxyConsumer(mapper); - wrapped.setDelegate(consumer); - wrapped.setInputType(findType(name)); - return wrapped; - } - private Class findType(AbstractBeanDefinition definition, int index) { - Object source = definition.getSource(); - Type param; - if (source instanceof StandardMethodMetadata) { - ParameterizedType type; - type = (ParameterizedType) ((StandardMethodMetadata) source).getIntrospectedMethod() - .getGenericReturnType(); - Type typeArgumentAtIndex = type.getActualTypeArguments()[index]; - if (typeArgumentAtIndex instanceof ParameterizedType) { - param = ((ParameterizedType) typeArgumentAtIndex).getActualTypeArguments()[0]; - } - else { - param = typeArgumentAtIndex; - } - } - else if (source instanceof FileSystemResource) { - try { - Type type = ClassUtils.forName(definition.getBeanClassName(), null); - if (type instanceof ParameterizedType) { - Type typeArgumentAtIndex = ((ParameterizedType)type).getActualTypeArguments()[index]; - if (typeArgumentAtIndex instanceof ParameterizedType) { - param = ((ParameterizedType) typeArgumentAtIndex).getActualTypeArguments()[0]; - } else { - param = typeArgumentAtIndex; - } - } - else { - param = type; - } - } catch (ClassNotFoundException e) { - throw new IllegalStateException("Cannot instrospect bean: " + definition, e); - } + Object source = definition.getSource(); + Type param; + if (source instanceof StandardMethodMetadata) { + ParameterizedType type; + type = (ParameterizedType) ((StandardMethodMetadata) source).getIntrospectedMethod() + .getGenericReturnType(); + Type typeArgumentAtIndex = type.getActualTypeArguments()[index]; + if (typeArgumentAtIndex instanceof ParameterizedType) { + param = ((ParameterizedType) typeArgumentAtIndex).getActualTypeArguments()[0]; } else { - ResolvableType resolvable = (ResolvableType) getField(definition, - "targetType"); - param = resolvable.getGeneric(index).getGeneric(0).getType(); + param = typeArgumentAtIndex; } - if (param instanceof ParameterizedType) { - ParameterizedType concrete = (ParameterizedType) param; - param = concrete.getRawType(); + } + else if (source instanceof FileSystemResource) { + try { + Type type = ClassUtils.forName(definition.getBeanClassName(), null); + if (type instanceof ParameterizedType) { + Type typeArgumentAtIndex = ((ParameterizedType)type).getActualTypeArguments()[index]; + if (typeArgumentAtIndex instanceof ParameterizedType) { + param = ((ParameterizedType) typeArgumentAtIndex).getActualTypeArguments()[0]; + } else { + param = typeArgumentAtIndex; + } + } + else { + param = type; + } + } catch (ClassNotFoundException e) { + throw new IllegalStateException("Cannot instrospect bean: " + definition, e); } - return ClassUtils.resolveClassName(param.getTypeName(), - registry.getClass().getClassLoader()); } - - private Object getField(Object target, String name) { - Field field = ReflectionUtils.findField(target.getClass(), name); - ReflectionUtils.makeAccessible(field); - return ReflectionUtils.getField(field, target); + else { + ResolvableType resolvable = (ResolvableType) getField(definition, + "targetType"); + param = resolvable.getGeneric(index).getGeneric(0).getType(); } - - private Class findType(String name) { - return findType((AbstractBeanDefinition) registry.getBeanDefinition(name), 0); + if (param instanceof ParameterizedType) { + ParameterizedType concrete = (ParameterizedType) param; + param = concrete.getRawType(); } - - private Class findOutputType(String name) { - return findType((AbstractBeanDefinition) registry.getBeanDefinition(name), 1); - } - - } -} - -abstract class ProxyWrapper { - - private ObjectMapper mapper; - - private T delegate; - - private Class inputType; - - private Class outputType; - - public ProxyWrapper(ObjectMapper mapper) { - this.mapper = mapper; - } - - public void setDelegate(T delegate) { - this.delegate = delegate; - } - - public T getDelegate() { - return delegate; - } - - public void setInputType(Class inputType) { - this.inputType = inputType; - } - - public void setOutputType(Class outputType) { - this.outputType = outputType; - } - - public Class getInputType() { - return this.inputType; - } - - public Class getOutputType() { - return outputType; - } - - public Object fromJson(String value) { - if (getInputType().equals(String.class)) { - return value; - } - try { - return mapper.readValue(value, getInputType()); - } - catch (Exception e) { - throw new IllegalStateException("Cannot convert from JSON: " + value); - } - } - - public String toJson(Object value) { - if (String.class.equals(getOutputType()) && value instanceof String) { - return (String) value; - } - try { - return mapper.writeValueAsString(value); - } - catch (Exception e) { - throw new IllegalStateException("Cannot convert to JSON: " + value); - } - } - - @Override - public String toString() { - return "ProxyWrapper [delegate=" + delegate + ", inputType=" + inputType + "]"; - } - -} - -class ProxySupplier extends ProxyWrapper>> - implements Supplier> { - - @Autowired - public ProxySupplier(ObjectMapper mapper) { - super(mapper); - } - - @Override - public Flux get() { - return getDelegate().get().map(this::toJson); - } -} - -class ProxyFunction extends ProxyWrapper, Flux>> - implements Function, Flux> { - - @Autowired - public ProxyFunction(ObjectMapper mapper) { - super(mapper); - } - - @Override - public Flux apply(Flux input) { - return getDelegate().apply(input.map(this::fromJson)).map(this::toJson); - } -} - -class ProxyConsumer extends ProxyWrapper>> - implements Consumer> { - - @Autowired - public ProxyConsumer(ObjectMapper mapper) { - super(mapper); - } - - @Override - public void accept(Flux input) { - getDelegate().accept(input.map(this::fromJson)); + return ClassUtils.resolveClassName(param.getTypeName(), + registry.getClass().getClassLoader()); } + + private Object getField(Object target, String name) { + Field field = ReflectionUtils.findField(target.getClass(), name); + ReflectionUtils.makeAccessible(field); + return ReflectionUtils.getField(field, target); + } + + private Class findInputType(String name) { + if (!registry.containsBeanDefinition(name)) { + return Object.class; + } + return findType((AbstractBeanDefinition) registry.getBeanDefinition(name), 0); + } + + private Class findOutputType(String name) { + if (!registry.containsBeanDefinition(name)) { + return Object.class; + } + return findType((AbstractBeanDefinition) registry.getBeanDefinition(name), 1); + } } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionInspector.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionInspector.java new file mode 100644 index 000000000..406b8da3e --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionInspector.java @@ -0,0 +1,33 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context; + +/** + * @author Dave Syer + * + */ +public interface FunctionInspector { + + Class getInputType(String name); + + Class getOutputType(String name); + + Object convert(String name, String value); + + String getName(Object function); + +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverter.java deleted file mode 100644 index d760f954c..000000000 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverter.java +++ /dev/null @@ -1,323 +0,0 @@ -/* - * Copyright 2012-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.web.flux; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.springframework.http.HttpInputMessage; -import org.springframework.http.HttpOutputMessage; -import org.springframework.http.MediaType; -import org.springframework.http.converter.HttpMessageConverter; -import org.springframework.http.converter.HttpMessageNotReadableException; -import org.springframework.http.converter.HttpMessageNotWritableException; - -import reactor.core.publisher.Flux; - -/** - * Converter for request bodies of type Flux. - * - * @author Dave Syer - * - */ -public class FluxHttpMessageConverter implements HttpMessageConverter> { - - private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream"); - - @Override - public boolean canRead(Class clazz, MediaType mediaType) { - return Flux.class.isAssignableFrom(clazz); - } - - @Override - public boolean canWrite(Class clazz, MediaType mediaType) { - return false; - } - - @Override - public List getSupportedMediaTypes() { - return Arrays.asList(MediaType.ALL); - } - - @Override - public Flux read(Class> clazz, - HttpInputMessage inputMessage) - throws IOException, HttpMessageNotReadableException { - - MediaType mediaType = inputMessage.getHeaders().getContentType(); - if (mediaType != null) { - if (!MediaType.ALL.equals(mediaType) && mediaType.includes(MediaType.APPLICATION_JSON)) { - return new JsonObjectDecoder().decode(inputMessage.getBody()); - } - if (mediaType.includes(EVENT_STREAM)) { - return splitOnSseData(inputMessage); - } - } - - return splitOnLineEndings(inputMessage); - } - - private Flux splitOnLineEndings(HttpInputMessage inputMessage) { - return Flux.create(sink -> { - BufferedReader reader; - try { - reader = new BufferedReader( - new InputStreamReader(inputMessage.getBody())); - String line = reader.readLine(); - while (line != null) { - sink.next(line); - line = reader.readLine(); - } - } - catch (IOException e) { - sink.error(e); - } - sink.complete(); - }); - } - - private Flux splitOnSseData(HttpInputMessage inputMessage) { - return Flux.create(sink -> { - BufferedReader reader; - StringBuffer buffer = new StringBuffer(); - int emptyCount = 0; - try { - reader = new BufferedReader( - new InputStreamReader(inputMessage.getBody())); - String line = reader.readLine(); - while (line != null) { - if (line.length() == 0) { - emptyCount++; - } - else { - if (buffer.length() == 0) { - if (line.startsWith("data:")) { - line = line.length() > "data:".length() - ? line.substring("data:".length()) : ""; - } - } - else { - buffer.append("\n"); - } - buffer.append(line); - } - if (emptyCount > 0) { - sink.next(buffer.toString()); - buffer.setLength(0); - emptyCount = 0; - while (line != null && line.length() == 0) { - line = reader.readLine(); - } - } - else { - line = reader.readLine(); - } - } - if (buffer.length()>0) { - sink.next(buffer.toString()); - } - } - catch (IOException e) { - sink.error(e); - } - sink.complete(); - }); - } - - @Override - public void write(Flux t, MediaType contentType, - HttpOutputMessage outputMessage) - throws IOException, HttpMessageNotWritableException { - } - - static class JsonObjectDecoder { - - private static final int ST_CORRUPTED = -1; - - private static final int ST_INIT = 0; - - private static final int ST_DECODING_NORMAL = 1; - - private static final int ST_DECODING_ARRAY_STREAM = 2; - - private final int maxObjectLength = 1024 * 1024; - - private int openBraces; - private int state; - private boolean insideString; - private int writerIndex; - private boolean streamArrayElements = true; - - public Flux decode(InputStream body) { - InputStreamReader reader = new InputStreamReader(body); - char[] buffer = new char[1024]; - try { - List chunks = new ArrayList<>(); - int read = reader.read(buffer); - this.writerIndex += read; - while (read >= 0) { - if (this.state == ST_CORRUPTED) { - return Flux.error(new IllegalStateException("Corrupted stream")); - } - if (this.writerIndex > maxObjectLength) { - // buffer size exceeded maxObjectLength; discarding the complete - // buffer. - reset(); - return Flux.error(new IllegalStateException( - "object length exceeds " + maxObjectLength + ": " - + this.writerIndex + " bytes discarded")); - } - int point = 0; - for (int index = 0; index < read; index++) { - char c = buffer[index]; - if (this.state == ST_DECODING_NORMAL) { - decodeByte(c, buffer, index); - - // All opening braces/brackets have been closed. That's enough - // to conclude that the JSON object/array is complete. - if (this.openBraces == 0) { - char[] json = extractObject(buffer, point, - index + 1 - point); - if (json != null) { - chunks.add(new String(json)); - } - - // The JSON object/array was extracted => discard the - // bytes from the input buffer. - point += index + 1 - point; - // Reset the object state to get ready for the next JSON - // object/text coming along the byte stream. - reset(); - } - } - else if (this.state == ST_DECODING_ARRAY_STREAM) { - decodeByte(c, buffer, index); - - if (!this.insideString && (this.openBraces == 1 && c == ',' - || this.openBraces == 0 && c == ']')) { - // skip leading spaces. No range check is needed and the - // loop will terminate because the byte at position index - // is not a whitespace. - for (int i = point; Character - .isWhitespace(buffer[i]); i++) { - point++; - } - - // skip trailing spaces. - int idxNoSpaces = index - 1; - while (idxNoSpaces >= 0 - && Character.isWhitespace(buffer[idxNoSpaces])) { - idxNoSpaces--; - } - - char[] json = extractObject(buffer, point, - idxNoSpaces + 1 - point); - if (json != null) { - chunks.add(new String(json)); - } - - point += index + 1 - point; - - if (c == ']') { - reset(); - } - } - // JSON object/array detected. Accumulate bytes until all - // braces/brackets are closed. - } - else if (c == '{' || c == '[') { - initDecoding(c, this.streamArrayElements); - - if (this.state == ST_DECODING_ARRAY_STREAM) { - // Discard the array bracket - point++; - } - // Discard leading spaces in front of a JSON object/array. - } - else if (Character.isWhitespace(c)) { - point++; - } - else { - this.state = ST_CORRUPTED; - return Flux.error(new IllegalStateException( - "invalid JSON received at byte position " - + writerIndex)); - } - } - read = reader.read(buffer); - } - - return Flux.fromIterable(chunks); - } - catch (IOException e) { - return Flux.error(new IllegalStateException("Cannot read stream", e)); - } - } - - private char[] extractObject(char[] buffer, int index, int length) { - if (length <= 0) { - return null; - } - return Arrays.copyOfRange(buffer, index, index + length); - } - - private void decodeByte(char c, char[] input, int index) { - if ((c == '{' || c == '[') && !this.insideString) { - this.openBraces++; - } - else if ((c == '}' || c == ']') && !this.insideString) { - this.openBraces--; - } - else if (c == '"') { - // start of a new JSON string. It's necessary to detect strings as they - // may also contain braces/brackets and that could lead to incorrect - // results. - if (!this.insideString) { - this.insideString = true; - // If the double quote wasn't escaped then this is the end of a - // string. - } - else if (input[index - 1] != '\\') { - this.insideString = false; - } - } - } - - private void initDecoding(char openingBrace, boolean streamArrayElements) { - this.openBraces = 1; - if (openingBrace == '[' && streamArrayElements) { - this.state = ST_DECODING_ARRAY_STREAM; - } - else { - this.state = ST_DECODING_NORMAL; - } - } - - private void reset() { - this.insideString = false; - this.state = ST_INIT; - this.openBraces = 0; - } - - } - -} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java similarity index 65% rename from spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java rename to spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java index 891b41bed..a3096472d 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java @@ -14,15 +14,17 @@ * limitations under the License. */ -package org.springframework.cloud.function.web; +package org.springframework.cloud.function.web.flux; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.function.context.FunctionInspector; import org.springframework.cloud.function.support.FluxSupplier; import org.springframework.cloud.function.support.FunctionUtils; +import org.springframework.cloud.function.web.flux.request.FluxRequest; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; @@ -42,25 +44,30 @@ import reactor.core.publisher.Mono; */ @Component public class FunctionController { + + private FunctionInspector inspector; @Value("${debug:${DEBUG:false}}") private boolean debug = false; + public FunctionController(FunctionInspector inspector) { + this.inspector = inspector; + } + @PostMapping(path = "/**") @ResponseBody - public ResponseEntity> post( - @RequestAttribute(required = false, name = "org.springframework.cloud.function.web.FunctionHandlerMapping.function") Function, Flux> function, - @RequestAttribute(required = false, name = "org.springframework.cloud.function.web.FunctionHandlerMapping.consumer") Consumer> consumer, - @RequestBody Flux body) { + public ResponseEntity> post( + @RequestAttribute(required = false, name = "org.springframework.cloud.function.web.flux.FunctionHandlerMapping.function") Function, Flux> function, + @RequestAttribute(required = false, name = "org.springframework.cloud.function.web.flux.FunctionHandlerMapping.consumer") Consumer> consumer, + @RequestBody FluxRequest body) { if (function != null) { - @SuppressWarnings("unchecked") - Flux result = (Flux) function.apply(body); + Flux result = (Flux) function.apply(body.flux()); return ResponseEntity.ok().body(debug ? result.log() : result); } if (consumer != null) { - body = body.cache(); // send a copy back to the caller - consumer.accept(body); - return ResponseEntity.status(HttpStatus.ACCEPTED).body(body); + Flux flux = body.flux().cache(); // send a copy back to the caller + consumer.accept(flux); + return ResponseEntity.status(HttpStatus.ACCEPTED).body(flux); } throw new IllegalArgumentException("no such function"); } @@ -68,9 +75,9 @@ public class FunctionController { @GetMapping(path = "/**") @ResponseBody public Object get( - @RequestAttribute(required = false, name = "org.springframework.cloud.function.web.FunctionHandlerMapping.function") Function, Flux> function, - @RequestAttribute(required = false, name = "org.springframework.cloud.function.web.FunctionHandlerMapping.supplier") Supplier> supplier, - @RequestAttribute(required = false, name = "org.springframework.cloud.function.web.FunctionHandlerMapping.argument") String argument) { + @RequestAttribute(required = false, name = "org.springframework.cloud.function.web.flux.FunctionHandlerMapping.function") Function, Flux> function, + @RequestAttribute(required = false, name = "org.springframework.cloud.function.web.flux.FunctionHandlerMapping.supplier") Supplier> supplier, + @RequestAttribute(required = false, name = "org.springframework.cloud.function.web.flux.FunctionHandlerMapping.argument") String argument) { if (function != null) { return value(function, argument); } @@ -78,18 +85,18 @@ public class FunctionController { } @SuppressWarnings({ "unchecked", "rawtypes" }) - private Flux supplier(Supplier> supplier) { + private Flux supplier(Supplier> supplier) { if (!FunctionUtils.isFluxSupplier(supplier)) { supplier = new FluxSupplier(supplier); } - Flux result = (Flux) supplier.get(); + Flux result = supplier.get(); return debug ? result.log() : result; } - private Mono value(Function, Flux> function, + private Mono value(Function, Flux> function, @PathVariable String value) { - @SuppressWarnings({ "unchecked" }) - Mono result = Mono.from((Flux) function.apply(Flux.just(value))); + Object input = inspector.convert(inspector.getName(function), value); + Mono result = Mono.from(function.apply(Flux.just(input))); return debug ? result.log() : result; } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionHandlerMapping.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionHandlerMapping.java similarity index 86% rename from spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionHandlerMapping.java rename to spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionHandlerMapping.java index 4e43a1bb9..28a3e12a9 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionHandlerMapping.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionHandlerMapping.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.cloud.function.web; +package org.springframework.cloud.function.web.flux; import java.util.function.Consumer; import java.util.function.Function; @@ -26,7 +26,9 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.cloud.function.context.FunctionInspector; import org.springframework.cloud.function.registry.FunctionCatalog; +import org.springframework.cloud.function.web.flux.request.FluxHandlerMethodArgumentResolver; import org.springframework.context.annotation.Configuration; import org.springframework.web.method.HandlerMethod; import org.springframework.web.servlet.HandlerMapping; @@ -57,10 +59,10 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping private String prefix = ""; @Autowired - public FunctionHandlerMapping(FunctionCatalog catalog) { + public FunctionHandlerMapping(FunctionCatalog catalog, FunctionInspector inspector) { this.functions = catalog; setOrder(super.getOrder() - 5); - this.controller = new FunctionController(); + this.controller = new FunctionController(inspector); } @Override @@ -87,10 +89,14 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping if (path == null) { return handler; } - if (findFunctionForGet(request, path) != null) { + Object function = findFunctionForGet(request, path); + if (function != null) { + request.setAttribute(FluxHandlerMethodArgumentResolver.HANDLER, function); return handler; } - if (findFunctionForPost(request, path) != null) { + function = findFunctionForPost(request, path); + if (function != null) { + request.setAttribute(FluxHandlerMethodArgumentResolver.HANDLER, function); return handler; } return null; diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionMapping.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionMapping.java similarity index 95% rename from spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionMapping.java rename to spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionMapping.java index 2d3343df4..dcdcda92f 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionMapping.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionMapping.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.cloud.function.web; +package org.springframework.cloud.function.web.flux; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ReactorAutoConfiguration.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ReactorAutoConfiguration.java index d1b3f9cc7..87dbd6274 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ReactorAutoConfiguration.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ReactorAutoConfiguration.java @@ -16,20 +16,32 @@ package org.springframework.cloud.function.web.flux; +import java.util.ArrayList; import java.util.List; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication; import org.springframework.boot.autoconfigure.web.HttpMessageConverters; +import org.springframework.cloud.function.context.FunctionInspector; +import org.springframework.cloud.function.registry.FunctionCatalog; +import org.springframework.cloud.function.web.flux.request.FluxHandlerMethodArgumentResolver; +import org.springframework.cloud.function.web.flux.response.FluxReturnValueHandler; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.convert.support.DefaultConversionService; import org.springframework.http.converter.ObjectToStringHttpMessageConverter; +import org.springframework.util.ClassUtils; import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler; +import org.springframework.web.method.support.HandlerMethodArgumentResolver; import org.springframework.web.method.support.HandlerMethodReturnValueHandler; -import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter; +import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter; import reactor.core.publisher.Flux; @@ -40,7 +52,7 @@ import reactor.core.publisher.Flux; @Configuration @ConditionalOnWebApplication @ConditionalOnClass({ Flux.class, AsyncHandlerMethodReturnValueHandler.class }) -public class ReactorAutoConfiguration extends WebMvcConfigurerAdapter { +public class ReactorAutoConfiguration { @Autowired private ApplicationContext context; @@ -51,23 +63,59 @@ public class ReactorAutoConfiguration extends WebMvcConfigurerAdapter { } @Bean - public FluxReturnValueHandler fluxReturnValueHandler( - HttpMessageConverters converters) { - return new FluxReturnValueHandler(converters.getConverters()); + public FunctionHandlerMapping functionHandlerMapping(FunctionCatalog catalog, + FunctionInspector inspector) { + return new FunctionHandlerMapping(catalog, inspector); } @Configuration - protected static class FluxMessageConverterConfiguration { - + @ConditionalOnMissingClass("org.springframework.core.ReactiveAdapter") + protected static class FluxReturnValueConfiguration { @Bean - public FluxHttpMessageConverter fluxHttpMessageConverter() { - return new FluxHttpMessageConverter(); + public FluxReturnValueHandler fluxReturnValueHandler( + HttpMessageConverters converters) { + return new FluxReturnValueHandler(converters.getConverters()); } } - @Override - public void addReturnValueHandlers( - List returnValueHandlers) { - returnValueHandlers.add(context.getBean(FluxReturnValueHandler.class)); + @Configuration + protected static class FluxArgumentResolverConfiguration { + @Bean + public FluxHandlerMethodArgumentResolver fluxHandlerMethodArgumentResolver( + FunctionInspector inspector, ObjectMapper mapper) { + return new FluxHandlerMethodArgumentResolver(inspector, mapper); + } + } + + @Bean + public BeanPostProcessor fluxRequestMappingHandlerAdapterProcessor() { + return new BeanPostProcessor() { + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) + throws BeansException { + if (bean instanceof RequestMappingHandlerAdapter) { + RequestMappingHandlerAdapter adapter = (RequestMappingHandlerAdapter) bean; + List resolvers = new ArrayList<>( + adapter.getArgumentResolvers()); + resolvers.add(0, + context.getBean(FluxHandlerMethodArgumentResolver.class)); + adapter.setArgumentResolvers(resolvers); + if (!ClassUtils.isPresent("org.springframework.core.ReactiveAdapter", + null)) { + List handlers = new ArrayList<>( + adapter.getReturnValueHandlers()); + handlers.add(0, context.getBean(FluxReturnValueHandler.class)); + adapter.setReturnValueHandlers(handlers); + } + } + return bean; + } + + @Override + public Object postProcessBeforeInitialization(Object bean, String beanName) + throws BeansException { + return bean; + } + }; } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/DelegateHandler.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/DelegateHandler.java new file mode 100644 index 000000000..4abcede17 --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/DelegateHandler.java @@ -0,0 +1,46 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.flux.request; + +import org.springframework.beans.factory.ListableBeanFactory; +import org.springframework.cloud.function.context.FunctionInspector; + +public abstract class DelegateHandler { + + private final ListableBeanFactory factory; + private FunctionInspector processor; + private final Object source; + + public DelegateHandler(ListableBeanFactory factory, Object source) { + this.factory = factory; + this.source = source; + } + + public Class type() { + String name = source instanceof String ? (String) source + : processor.getName(source); + return (Class) processor().getInputType(name); + } + + private FunctionInspector processor() { + if (processor == null) { + processor = factory.getBean(FunctionInspector.class); + } + return processor; + } + +} \ No newline at end of file diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxHandlerMethodArgumentResolver.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxHandlerMethodArgumentResolver.java new file mode 100644 index 000000000..8ece77565 --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxHandlerMethodArgumentResolver.java @@ -0,0 +1,82 @@ +/* + * Copyright 2012-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.flux.request; + +import java.util.ArrayList; +import java.util.List; + +import javax.servlet.http.HttpServletRequest; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.springframework.cloud.function.context.FunctionInspector; +import org.springframework.core.MethodParameter; +import org.springframework.core.Ordered; +import org.springframework.web.bind.support.WebDataBinderFactory; +import org.springframework.web.context.request.NativeWebRequest; +import org.springframework.web.method.support.HandlerMethodArgumentResolver; +import org.springframework.web.method.support.ModelAndViewContainer; + +/** + * Converter for request bodies of type Flux. + * + * @author Dave Syer + * + */ +public class FluxHandlerMethodArgumentResolver + implements HandlerMethodArgumentResolver, Ordered { + + public static final String HANDLER = FluxHandlerMethodArgumentResolver.class.getName() + + ".HANDLER"; + + private final ObjectMapper mapper; + + private FunctionInspector inspector; + + public FluxHandlerMethodArgumentResolver(FunctionInspector inspector, + ObjectMapper mapper) { + this.inspector = inspector; + this.mapper = mapper; + } + + @Override + public int getOrder() { + return Ordered.HIGHEST_PRECEDENCE; + } + + @Override + public Object resolveArgument(MethodParameter parameter, + ModelAndViewContainer mavContainer, NativeWebRequest webRequest, + WebDataBinderFactory binderFactory) throws Exception { + Object handler = webRequest.getAttribute(HANDLER, NativeWebRequest.SCOPE_REQUEST); + Class type = inspector.getInputType(inspector.getName(handler)); + if (type == null) { + type = Object.class; + } + List body = mapper.readValue( + webRequest.getNativeRequest(HttpServletRequest.class).getInputStream(), + mapper.getTypeFactory().constructCollectionLikeType(ArrayList.class, + type)); + return new FluxRequest(body); + } + + @Override + public boolean supportsParameter(MethodParameter parameter) { + return FluxRequest.class.isAssignableFrom(parameter.getParameterType()); + } + +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxRequest.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxRequest.java new file mode 100644 index 000000000..00b2225fc --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxRequest.java @@ -0,0 +1,45 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.flux.request; + +import java.util.List; + +import reactor.core.publisher.Flux; + +/** + * @author Dave Syer + * + */ +public class FluxRequest { + + private List body; + + public FluxRequest(List body) { + this.body = body; + } + + public Flux flux() { + return Flux.fromIterable(body); + } + + public List body() { + return body; + } + +} + + diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxResponseBodyEmitter.java similarity index 93% rename from spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java rename to spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxResponseBodyEmitter.java index d175481cf..6c99697c0 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxResponseBodyEmitter.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.cloud.function.web.flux; +package org.springframework.cloud.function.web.flux.response; import org.reactivestreams.Publisher; @@ -41,7 +41,8 @@ class FluxResponseBodyEmitter extends ResponseBodyEmitter { public FluxResponseBodyEmitter(MediaType mediaType, Publisher observable) { super(); this.mediaType = mediaType; - new ResponseBodyEmitterSubscriber<>(mediaType, observable, this); + new ResponseBodyEmitterSubscriber<>(mediaType, observable, this, + MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)); } @Override diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxResponseSseEmitter.java similarity index 91% rename from spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java rename to spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxResponseSseEmitter.java index 9d63be7fb..2c5725dc9 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxResponseSseEmitter.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.cloud.function.web.flux; +package org.springframework.cloud.function.web.flux.response; import org.reactivestreams.Publisher; @@ -33,12 +33,12 @@ import reactor.core.publisher.Flux; class FluxResponseSseEmitter extends SseEmitter { public FluxResponseSseEmitter(Publisher observable) { - this(MediaType.valueOf("text/event-stream"), observable); + this(MediaType.valueOf("text/plain"), observable); } public FluxResponseSseEmitter(MediaType mediaType, Publisher observable) { super(); - new ResponseBodyEmitterSubscriber<>(mediaType, observable, this); + new ResponseBodyEmitterSubscriber<>(mediaType, observable, this, false); } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxReturnValueHandler.java similarity index 79% rename from spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java rename to spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxReturnValueHandler.java index 592e56613..1a8902684 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxReturnValueHandler.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.cloud.function.web.flux; +package org.springframework.cloud.function.web.flux.response; import java.time.Duration; import java.util.List; @@ -65,13 +65,19 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand @Override public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) { - return returnValue != null && supportsReturnType(returnType); + if (returnValue != null) { + return supportsReturnType(returnType); + } + return false; } @Override public boolean supportsReturnType(MethodParameter returnType) { - return Publisher.class.isAssignableFrom(returnType.getParameterType()) - || isResponseEntity(returnType); + return (returnType.getParameterType() != null + && (Publisher.class.isAssignableFrom(returnType.getParameterType()) + || isResponseEntity(returnType))) + || Publisher.class + .isAssignableFrom(returnType.getMethod().getReturnType()); } private boolean isResponseEntity(MethodParameter returnType) { @@ -87,6 +93,12 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand public void handleReturnValue(Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception { + + if (returnValue == null) { + mavContainer.setRequestHandled(true); + return; + } + Object adaptFrom = returnValue; if (returnValue instanceof ResponseEntity) { ResponseEntity value = (ResponseEntity) returnValue; @@ -96,9 +108,19 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand } Publisher flux = (Publisher) adaptFrom; - MediaType mediaType = webRequest.getHeader("Accept") == null ? null - : MediaType.parseMediaTypes(webRequest.getHeader("Accept")).iterator() - .next(); + MediaType mediaType = null; + if (webRequest.getHeader("Accept") != null) { + for (MediaType type : MediaType + .parseMediaTypes(webRequest.getHeader("Accept"))) { + if (!MediaType.ALL.equals(type) + && MediaType.APPLICATION_JSON.isCompatibleWith(type)) { + mediaType = MediaType.APPLICATION_JSON; + break; + } else if (mediaType==null) { + mediaType = type; + } + } + } delegate.handleReturnValue(getEmitter(timeout, flux, mediaType), returnType, mavContainer, webRequest); } @@ -109,7 +131,8 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand : Flux.from(flux).timeout(Duration.ofMillis(timeout), Flux.empty()); if (!MediaType.ALL.equals(mediaType) && EVENT_STREAM.isCompatibleWith(mediaType)) { - return new FluxResponseSseEmitter<>(mediaType, exported); + // TODO: more subtle content negotiation + return new FluxResponseSseEmitter<>(MediaType.APPLICATION_JSON, exported); } return new FluxResponseBodyEmitter<>(mediaType, exported); } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/ResponseBodyEmitterSubscriber.java similarity index 88% rename from spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java rename to spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/ResponseBodyEmitterSubscriber.java index 0436ab12d..b55b96cfe 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/ResponseBodyEmitterSubscriber.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.cloud.function.web.flux; +package org.springframework.cloud.function.web.flux.response; import java.io.IOException; import java.util.concurrent.TimeoutException; @@ -49,11 +49,14 @@ class ResponseBodyEmitterSubscriber implements Subscriber { private boolean single; + private boolean json; + public ResponseBodyEmitterSubscriber(MediaType mediaType, Publisher observable, - ResponseBodyEmitter responseBodyEmitter) { + ResponseBodyEmitter responseBodyEmitter, boolean json) { this.mediaType = mediaType; this.responseBodyEmitter = responseBodyEmitter; + this.json = json; this.responseBodyEmitter.onTimeout(new Timeout()); this.responseBodyEmitter.onCompletion(new Complete()); this.single = observable instanceof Mono; @@ -72,8 +75,7 @@ class ResponseBodyEmitterSubscriber implements Subscriber { Object object = value; try { - if (!MediaType.ALL.equals(mediaType) - && MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { + if (isJson()) { if (!this.firstElementWritten) { if (!single) { responseBodyEmitter.send("["); @@ -83,7 +85,7 @@ class ResponseBodyEmitterSubscriber implements Subscriber { else { responseBodyEmitter.send(","); } - if (value.getClass() == String.class + if (!single && value.getClass() == String.class && !((String) value).contains("\"")) { object = "\"" + value + "\""; } @@ -104,8 +106,7 @@ class ResponseBodyEmitterSubscriber implements Subscriber { if (!completed) { completed = true; try { - if (!MediaType.ALL.equals(mediaType) - && MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { + if (isJson()) { if (!single) { if (!this.firstElementWritten) { responseBodyEmitter.send("[]"); @@ -133,8 +134,7 @@ class ResponseBodyEmitterSubscriber implements Subscriber { if (!completed) { completed = true; try { - if (!MediaType.ALL.equals(mediaType) - && MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { + if (isJson()) { if (!single) { if (!this.firstElementWritten) { responseBodyEmitter.send("["); @@ -150,6 +150,10 @@ class ResponseBodyEmitterSubscriber implements Subscriber { } } + private boolean isJson() { + return json; + } + class Complete implements Runnable { @Override diff --git a/spring-cloud-function-web/src/main/resources/META-INF/spring.factories b/spring-cloud-function-web/src/main/resources/META-INF/spring.factories index 682f401a7..812023313 100644 --- a/spring-cloud-function-web/src/main/resources/META-INF/spring.factories +++ b/spring-cloud-function-web/src/main/resources/META-INF/spring.factories @@ -1,3 +1,2 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ -org.springframework.cloud.function.web.FunctionHandlerMapping,\ org.springframework.cloud.function.web.flux.ReactorAutoConfiguration \ No newline at end of file diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/mvc/MvcRestApplicationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/mvc/MvcRestApplicationTests.java new file mode 100644 index 000000000..5cfb5112d --- /dev/null +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/mvc/MvcRestApplicationTests.java @@ -0,0 +1,397 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.function.mvc; + +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.context.embedded.LocalServerPort; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.cloud.function.mvc.MvcRestApplicationTests.TestConfiguration; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.RequestEntity; +import org.springframework.http.ResponseEntity; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.StringUtils; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestController; + +import static org.assertj.core.api.Assertions.assertThat; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Tests for vanilla MVC handling (no function layer). Validates the MVC customizations + * that are added in this project independently of the specific concerns of function. + * + * @author Dave Syer + * + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = TestConfiguration.class, webEnvironment = WebEnvironment.RANDOM_PORT) +public class MvcRestApplicationTests { + + private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream"); + @LocalServerPort + private int port; + @Autowired + private TestRestTemplate rest; + @Autowired + private TestConfiguration test; + + @Before + public void init() { + test.list.clear(); + } + + @Test + public void wordsSSE() throws Exception { + assertThat(rest.exchange( + RequestEntity.get(new URI("/words")).accept(EVENT_STREAM).build(), + String.class).getBody()).isEqualTo(sse("foo", "bar")); + } + + @Test + public void wordsJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/words")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("[\"foo\",\"bar\"]"); + } + + @Test + @Ignore("Fix error handling") + public void errorJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/bang")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("[\"foo\"]"); + } + + @Test + public void words() throws Exception { + ResponseEntity result = rest + .exchange(RequestEntity.get(new URI("/words")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]"); + } + + @Test + public void foos() throws Exception { + ResponseEntity result = rest + .exchange(RequestEntity.get(new URI("/foos")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()) + .isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"); + } + + @Test + public void getMore() throws Exception { + ResponseEntity result = rest + .exchange(RequestEntity.get(new URI("/get/more")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]"); + } + + @Test + @Ignore("Should this even work? Or do we need to be explicit about the JSON?") + public void updates() throws Exception { + ResponseEntity result = rest.exchange( + RequestEntity.post(new URI("/updates")).body("one\ntwo"), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + assertThat(test.list).hasSize(2); + assertThat(result.getBody()).isEqualTo("onetwo"); + } + + @Test + public void updatesJson() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/updates")).contentType(MediaType.APPLICATION_JSON) + .body("[\"one\",\"two\"]"), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + assertThat(test.list).hasSize(2); + assertThat(result.getBody()).isEqualTo("[\"one\",\"two\"]"); + } + + @Test + public void addFoos() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/addFoos")).contentType(MediaType.APPLICATION_JSON) + .body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + assertThat(test.list).hasSize(2); + assertThat(result.getBody()) + .isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"); + } + + @Test + public void timeout() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/timeout")).build(), String.class) + .getBody()).isEqualTo("[\"foo\"]"); + } + + @Test + public void emptyJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/empty")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("[]"); + } + + @Test + public void sentences() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/sentences")).build(), String.class) + .getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]"); + } + + @Test + public void sentencesAcceptAny() throws Exception { + assertThat(rest.exchange( + RequestEntity.get(new URI("/sentences")).accept(MediaType.ALL).build(), + String.class).getBody()) + .isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]"); + } + + @Test + public void sentencesAcceptJson() throws Exception { + ResponseEntity result = rest + .exchange( + RequestEntity.get(new URI("/sentences")) + .accept(MediaType.APPLICATION_JSON).build(), + String.class); + assertThat(result.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]"); + assertThat(result.getHeaders().getContentType()) + .isGreaterThanOrEqualTo(MediaType.APPLICATION_JSON); + } + + @Test + public void uppercase() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class); + assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]"); + } + + @Test + public void uppercaseFoos() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/upFoos")).contentType(MediaType.APPLICATION_JSON) + .body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class); + assertThat(result.getBody()) + .isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]"); + } + + @Test + public void transform() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/transform")).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class); + assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]"); + } + + @Test + public void postMore() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/post/more")).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class); + assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]"); + } + + @Test + public void uppercaseGet() { + assertThat(rest.getForObject("/uppercase/foo", String.class)).isEqualTo("[FOO]"); + } + + @Test + public void convertGet() { + assertThat(rest.getForObject("/wrap/123", String.class)).isEqualTo("..123.."); + } + + @Test + public void convertGetJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/entity/321")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("{\"value\":321}"); + } + + @Test + public void uppercaseJsonStream() throws Exception { + assertThat(rest + .exchange(RequestEntity.post(new URI("/maps")) + .contentType(MediaType.APPLICATION_JSON) + .body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class) + .getBody()).isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]"); + } + + @Test + public void uppercaseSSE() throws Exception { + assertThat(rest.exchange(RequestEntity.post(new URI("/uppercase")) + .accept(EVENT_STREAM).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class).getBody()) + .isEqualTo(sse("[FOO]", "[BAR]")); + } + + private String sse(String... values) { + return "data:" + StringUtils.arrayToDelimitedString(values, "\n\ndata:") + "\n\n"; + } + + @EnableAutoConfiguration + @RestController + @Configuration + public static class TestConfiguration { + + private List list = new ArrayList<>(); + + @PostMapping({ "/uppercase", "/transform", "/post/more" }) + public Flux uppercase(@RequestBody List flux) { + return Flux.fromIterable(flux).log() + .map(value -> "[" + value.trim().toUpperCase() + "]"); + } + + @PostMapping("/upFoos") + public Flux upFoos(@RequestBody List list) { + return Flux.fromIterable(list).log() + .map(value -> new Foo(value.getValue().trim().toUpperCase())); + } + + @GetMapping("/uppercase/{id}") + public Mono uppercaseGet(@PathVariable String id) { + return Mono.just(id).map(value -> "[" + value.trim().toUpperCase() + "]"); + } + + @PostMapping("/wrap") + public Flux wrap(@RequestBody Flux flux) { + return flux.log().map(value -> ".." + value + ".."); + } + + @GetMapping("/wrap/{id}") + public Mono wrapGet(@PathVariable int id) { + return Mono.just(id).log().map(value -> ".." + value + ".."); + } + + @GetMapping("/entity/{id}") + public Mono> entity(@PathVariable Integer id) { + return Mono.just(id).log() + .map(value -> Collections.singletonMap("value", value)); + } + + @PostMapping("/maps") + public Flux> maps( + @RequestBody List> flux) { + return Flux.fromIterable(flux).map(value -> { + value.put("value", value.get("value").trim().toUpperCase()); + return value; + }); + } + + @GetMapping({ "/words", "/get/more" }) + public Flux words() { + return Flux.fromArray(new String[] { "foo", "bar" }); + } + + @GetMapping("/foos") + public Flux foos() { + return Flux.just(new Foo("foo"), new Foo("bar")); + } + + @PostMapping("/updates") + @ResponseStatus(HttpStatus.ACCEPTED) + public Flux updates(@RequestBody List list) { + Flux flux = Flux.fromIterable(list).cache(); + flux.subscribe(value -> this.list.add(value)); + return flux; + } + + @PostMapping("/addFoos") + @ResponseStatus(HttpStatus.ACCEPTED) + public Flux addFoos(@RequestBody List list) { + Flux flux = Flux.fromIterable(list).cache(); + flux.subscribe(value -> this.list.add(value.getValue())); + return flux; + } + + @GetMapping("/bang") + public Flux bang() { + return Flux.fromArray(new String[] { "foo", "bar" }).map(value -> { + if (value.equals("bar")) { + throw new RuntimeException("Bar"); + } + return value; + }); + } + + @GetMapping("/empty") + public Flux empty() { + return Flux.fromIterable(Collections.emptyList()); + } + + @GetMapping("/timeout") + public Flux timeout() { + return Flux.defer(() -> Flux.create(emitter -> { + emitter.next("foo"); + }).timeout(Duration.ofMillis(100L), Flux.empty())); + } + + @GetMapping("/sentences") + public Flux> sentences() { + return Flux.just(Arrays.asList("go", "home"), Arrays.asList("come", "back")); + } + + } + + public static class Foo { + private String value; + + public Foo(String value) { + this.value = value; + } + + Foo() { + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + } +} diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/PrefixTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/PrefixTests.java index f74ef8898..8250b6707 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/PrefixTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/PrefixTests.java @@ -56,7 +56,7 @@ public class PrefixTests { ResponseEntity result = rest .exchange(RequestEntity.get(new URI("/functions/words")).build(), String.class); assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); - assertThat(result.getBody()).isEqualTo("foobar"); + assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]"); } @EnableAutoConfiguration diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java index 70b1dcd0b..a26baafc2 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java @@ -16,6 +16,7 @@ package org.springframework.cloud.function.web; import java.net.URI; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -32,6 +33,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.context.embedded.LocalServerPort; import org.springframework.boot.test.context.SpringBootTest; @@ -57,7 +59,7 @@ import reactor.core.publisher.Flux; @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) public class RestApplicationTests { - private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream"); + private static final MediaType EVENT_STREAM = MediaType.TEXT_EVENT_STREAM; @LocalServerPort private int port; @Autowired @@ -104,7 +106,26 @@ public class RestApplicationTests { ResponseEntity result = rest .exchange(RequestEntity.get(new URI("/words")).build(), String.class); assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); - assertThat(result.getBody()).isEqualTo("foobar"); + assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]"); + } + + @Test + public void foos() throws Exception { + ResponseEntity result = rest + .exchange(RequestEntity.get(new URI("/foos")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()) + .isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"); + } + + @Test + public void qualifierFoos() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/foos")).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()) + .isEqualTo("[{\"value\":\"[FOO]\"},{\"value\":\"[BAR]\"}]"); } @Test @@ -112,7 +133,7 @@ public class RestApplicationTests { ResponseEntity result = rest .exchange(RequestEntity.get(new URI("/get/more")).build(), String.class); assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); - assertThat(result.getBody()).isEqualTo("foobar"); + assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]"); } @Test @@ -120,10 +141,11 @@ public class RestApplicationTests { ResponseEntity result = rest .exchange(RequestEntity.get(new URI("/bareWords")).build(), String.class); assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); - assertThat(result.getBody()).isEqualTo("foobar"); + assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]"); } @Test + @Ignore("Should this even work? Or do we need to be explicit about the JSON?") public void updates() throws Exception { ResponseEntity result = rest.exchange( RequestEntity.post(new URI("/updates")).body("one\ntwo"), String.class); @@ -133,13 +155,34 @@ public class RestApplicationTests { } @Test - public void bareUpdates() throws Exception { - ResponseEntity result = rest.exchange( - RequestEntity.post(new URI("/bareUpdates")).body("one\ntwo"), - String.class); + public void updatesJson() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/updates")).contentType(MediaType.APPLICATION_JSON) + .body("[\"one\",\"two\"]"), String.class); assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); assertThat(test.list).hasSize(2); - assertThat(result.getBody()).isEqualTo("onetwo"); + assertThat(result.getBody()).isEqualTo("[\"one\",\"two\"]"); + } + + @Test + public void addFoos() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/addFoos")).contentType(MediaType.APPLICATION_JSON) + .body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + assertThat(test.list).hasSize(2); + assertThat(result.getBody()) + .isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"); + } + + @Test + public void bareUpdates() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/bareUpdates")).contentType(MediaType.APPLICATION_JSON) + .body("[\"one\",\"two\"]"), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + assertThat(test.list).hasSize(2); + assertThat(result.getBody()).isEqualTo("[\"one\",\"two\"]"); } @Test @@ -162,7 +205,7 @@ public class RestApplicationTests { public void sentences() throws Exception { assertThat(rest .exchange(RequestEntity.get(new URI("/sentences")).build(), String.class) - .getBody()).isEqualTo("[\"go\",\"home\"][\"come\",\"back\"]"); + .getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]"); } @Test @@ -170,7 +213,7 @@ public class RestApplicationTests { assertThat(rest.exchange( RequestEntity.get(new URI("/sentences")).accept(MediaType.ALL).build(), String.class).getBody()) - .isEqualTo("[\"go\",\"home\"][\"come\",\"back\"]"); + .isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]"); } @Test @@ -182,7 +225,7 @@ public class RestApplicationTests { String.class); assertThat(result.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]"); assertThat(result.getHeaders().getContentType()) - .isEqualTo(MediaType.APPLICATION_JSON); + .isGreaterThanOrEqualTo(MediaType.APPLICATION_JSON); } @Test @@ -197,27 +240,46 @@ public class RestApplicationTests { } @Test - public void uppercase() { - assertThat(rest.postForObject("/uppercase", "foo\nbar", String.class)) - .isEqualTo("[FOO][BAR]"); + public void uppercase() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class); + assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]"); } @Test - public void bareUppercase() { - assertThat(rest.postForObject("/bareUppercase", "foo\nbar", String.class)) - .isEqualTo("[FOO][BAR]"); + public void uppercaseFoos() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + // TODO: does not require a content type header, but the plain MVC version + // does + .post(new URI("/upFoos")).contentType(MediaType.APPLICATION_JSON) + .body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class); + assertThat(result.getBody()) + .isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]"); } @Test - public void transform() { - assertThat(rest.postForObject("/transform", "foo\nbar", String.class)) - .isEqualTo("[FOO][BAR]"); + public void bareUppercase() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/bareUppercase")).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class); + assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]"); } @Test - public void postMore() { - assertThat(rest.postForObject("/post/more", "foo\nbar", String.class)) - .isEqualTo("[FOO][BAR]"); + public void transform() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/transform")).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class); + assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]"); + } + + @Test + public void postMore() throws Exception { + ResponseEntity result = rest.exchange(RequestEntity + .post(new URI("/post/more")).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class); + assertThat(result.getBody()).isEqualTo("[\"[FOO]\",\"[BAR]\"]"); } @Test @@ -237,7 +299,8 @@ public class RestApplicationTests { @Test public void supplierFirst() { - assertThat(rest.getForObject("/not/a/function", String.class)).isEqualTo("hello"); + assertThat(rest.getForObject("/not/a/function", String.class)) + .isEqualTo("[\"hello\"]"); } @Test @@ -256,26 +319,15 @@ public class RestApplicationTests { // The new line in the middle is optional .body("[{\"value\":\"foo\"},\n{\"value\":\"bar\"}]"), String.class).getBody()) - .isEqualTo("{\"value\":\"FOO\"}{\"value\":\"BAR\"}"); - } - - @Test - public void uppercaseJsonStream() throws Exception { - assertThat(rest - .exchange(RequestEntity.post(new URI("/maps")) - .contentType(MediaType.APPLICATION_JSON) - // TODO: make this work without newline separator - .body("{\"value\":\"foo\"}\n{\"value\":\"bar\"}"), String.class) - .getBody()).isEqualTo("{\"value\":\"FOO\"}{\"value\":\"BAR\"}"); + .isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]"); } @Test public void uppercaseSSE() throws Exception { - assertThat( - rest.exchange( - RequestEntity.post(new URI("/uppercase")).accept(EVENT_STREAM) - .contentType(EVENT_STREAM).body(sse("foo", "bar")), - String.class).getBody()).isEqualTo(sse("[FOO]", "[BAR]")); + assertThat(rest.exchange(RequestEntity.post(new URI("/uppercase")) + .accept(EVENT_STREAM).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class).getBody()) + .isEqualTo(sse("[FOO]", "[BAR]")); } private String sse(String... values) { @@ -299,6 +351,12 @@ public class RestApplicationTests { return value -> "[" + value.trim().toUpperCase() + "]"; } + @Bean + public Function, Flux> upFoos() { + return flux -> flux.log() + .map(value -> new Foo(value.getValue().trim().toUpperCase())); + } + @Bean public Function, Flux> wrap() { return flux -> flux.log().map(value -> ".." + value + ".."); @@ -320,7 +378,18 @@ public class RestApplicationTests { @Bean({ "words", "get/more" }) public Supplier> words() { - return () -> Flux.fromArray(new String[] { "foo", "bar" }); + return () -> Flux.just("foo", "bar"); + } + + @Bean + public Supplier> foos() { + return () -> Flux.just(new Foo("foo"), new Foo("bar")); + } + + @Bean + @Qualifier("foos") + public Function qualifier() { + return value -> new Foo("[" + value.trim().toUpperCase() + "]"); } @Bean @@ -333,6 +402,11 @@ public class RestApplicationTests { return flux -> flux.subscribe(value -> list.add(value)); } + @Bean + public Consumer> addFoos() { + return flux -> flux.subscribe(value -> list.add(value.getValue())); + } + @Bean public Consumer bareUpdates() { return value -> list.add(value); @@ -365,9 +439,9 @@ public class RestApplicationTests { @Bean public Supplier> timeout() { - return () -> Flux.create(emitter -> { + return () -> Flux.defer(() -> Flux.create(emitter -> { emitter.next("foo"); - }); + }).timeout(Duration.ofMillis(100L), Flux.empty())); } @Bean @@ -378,4 +452,23 @@ public class RestApplicationTests { } + public static class Foo { + private String value; + + public Foo(String value) { + this.value = value; + } + + Foo() { + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + } + } diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverterTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverterTests.java deleted file mode 100644 index ee936f543..000000000 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverterTests.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright 2012-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.web.flux; - -import org.junit.Test; - -import org.springframework.http.MediaType; -import org.springframework.mock.http.MockHttpInputMessage; - -import static org.assertj.core.api.Assertions.assertThat; - -import reactor.core.publisher.Flux; - -/** - * @author Dave Syer - * - */ -public class FluxHttpMessageConverterTests { - - private FluxHttpMessageConverter converter = new FluxHttpMessageConverter(); - - private Class> type = null; - - @Test - public void newlines() throws Exception { - MockHttpInputMessage message = new MockHttpInputMessage("foo\nbar".getBytes()); - assertThat(converter.read(type, message).collectList().block()).contains("foo", - "bar"); - } - - @Test - public void sse() throws Exception { - MockHttpInputMessage message = new MockHttpInputMessage( - "data:foo\n\ndata:bar".getBytes()); - message.getHeaders().setContentType(MediaType.valueOf("text/event-stream")); - assertThat(converter.read(type, message).collectList().block()).contains("foo", - "bar"); - } - - @Test - public void jsonStream() throws Exception { - MockHttpInputMessage message = new MockHttpInputMessage( - "{\"value\":\"foo\"}{\"value\":\"barrier\"}".getBytes()); - message.getHeaders().setContentType(MediaType.APPLICATION_JSON); - assertThat(converter.read(type, message).collectList().block()) - .contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}"); - } - - @Test - public void jsonStreamWhitespace() throws Exception { - MockHttpInputMessage message = new MockHttpInputMessage( - "{\"value\":\"foo\"} {\"value\":\"barrier\"} ".getBytes()); - message.getHeaders().setContentType(MediaType.APPLICATION_JSON); - assertThat(converter.read(type, message).collectList().block()) - .contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}"); - } - - @Test - public void jsonStreamNewline() throws Exception { - MockHttpInputMessage message = new MockHttpInputMessage( - "{\"value\":\"foo\"}\n{\"value\":\"barrier\"}".getBytes()); - message.getHeaders().setContentType(MediaType.APPLICATION_JSON); - assertThat(converter.read(type, message).collectList().block()) - .contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}"); - } - - @Test - public void jsonArray() throws Exception { - MockHttpInputMessage message = new MockHttpInputMessage( - "[{\"value\":\"foo\"},{\"value\":\"barrier\"}]".getBytes()); - message.getHeaders().setContentType(MediaType.APPLICATION_JSON); - assertThat(converter.read(type, message).collectList().block()) - .contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}"); - } - - @Test - public void jsonArrayWhitespace() throws Exception { - MockHttpInputMessage message = new MockHttpInputMessage( - "[{\"value\":\"foo\"}, {\"value\":\"barrier\"}] ".getBytes()); - message.getHeaders().setContentType(MediaType.APPLICATION_JSON); - assertThat(converter.read(type, message).collectList().block()) - .contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}"); - } - - @Test - public void jsonArrayNewline() throws Exception { - MockHttpInputMessage message = new MockHttpInputMessage( - "[{\"value\":\"foo\"},\n{\"value\":\"barrier\"}]".getBytes()); - message.getHeaders().setContentType(MediaType.APPLICATION_JSON); - assertThat(converter.read(type, message).collectList().block()) - .contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}"); - } - -}