From 216e5c9207b68adf02e0ebb70e8357298124efda Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Wed, 11 Jan 2017 10:39:43 -0500 Subject: [PATCH] Add MVC body processors to handle Flux We don't need to cover all the possible uses of Flux (only Flux really), so this isn't comprehensive coverage of all the features in Spring WebFlux, but it's good enough for functions to run with Spring Boot 1.5. --- spring-cloud-function-context/pom.xml | 12 - spring-cloud-function-deployer/pom.xml | 14 - .../spring-cloud-function-sample-pojo/pom.xml | 16 +- .../spring-cloud-function-sample/pom.xml | 14 +- spring-cloud-function-web/pom.xml | 23 +- .../function/web/FunctionController.java | 3 +- .../cloud/function/web/RestApplication.java | 299 +--------------- .../web/flux/FluxHttpMessageConverter.java | 323 ++++++++++++++++++ .../web/flux/FluxResponseBodyEmitter.java | 41 +++ .../web/flux/FluxResponseSseEmitter.java | 42 +++ .../web/flux/FluxReturnValueHandler.java | 93 +++++ .../web/flux/ReactorAutoConfiguration.java | 64 ++++ .../flux/ResponseBodyEmitterSubscriber.java | 130 +++++++ .../function/web/RestApplicationTests.java | 22 +- .../flux/FluxHttpMessageConverterTests.java | 108 ++++++ 15 files changed, 820 insertions(+), 384 deletions(-) create mode 100644 spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverter.java create mode 100644 spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java create mode 100644 spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java create mode 100644 spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java create mode 100644 spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ReactorAutoConfiguration.java create mode 100644 spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java create mode 100644 spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverterTests.java diff --git a/spring-cloud-function-context/pom.xml b/spring-cloud-function-context/pom.xml index 757fe4d27..863e28b3c 100644 --- a/spring-cloud-function-context/pom.xml +++ b/spring-cloud-function-context/pom.xml @@ -37,18 +37,6 @@ - - - - org.springframework.boot - spring-boot-dependencies - 2.0.0.BUILD-SNAPSHOT - pom - import - - - - diff --git a/spring-cloud-function-deployer/pom.xml b/spring-cloud-function-deployer/pom.xml index dccb0937b..f8daf6a0e 100644 --- a/spring-cloud-function-deployer/pom.xml +++ b/spring-cloud-function-deployer/pom.xml @@ -54,20 +54,6 @@ - - org.springframework.boot - spring-boot-dependencies - 2.0.0.BUILD-SNAPSHOT - pom - import - - - org.springframework.boot.experimental - spring-boot-dependencies-web-reactive - 0.1.0.BUILD-SNAPSHOT - pom - import - org.springframework.cloud spring-cloud-function-parent diff --git a/spring-cloud-function-samples/spring-cloud-function-sample-pojo/pom.xml b/spring-cloud-function-samples/spring-cloud-function-sample-pojo/pom.xml index fb2b1973a..620f889ba 100644 --- a/spring-cloud-function-samples/spring-cloud-function-sample-pojo/pom.xml +++ b/spring-cloud-function-samples/spring-cloud-function-sample-pojo/pom.xml @@ -7,13 +7,13 @@ function-sample-pojo 1.0.0.BUILD-SNAPSHOT jar - spring-cloud-function-sample + spring-cloud-function-sample-pojo Spring Cloud Function Web Support org.springframework.boot spring-boot-starter-parent - 2.0.0.BUILD-SNAPSHOT + 1.5.0.BUILD-SNAPSHOT @@ -41,18 +41,6 @@ - - - - org.springframework.boot - spring-boot-dependencies - 2.0.0.BUILD-SNAPSHOT - pom - import - - - - diff --git a/spring-cloud-function-samples/spring-cloud-function-sample/pom.xml b/spring-cloud-function-samples/spring-cloud-function-sample/pom.xml index c2d277cc1..5940479cd 100644 --- a/spring-cloud-function-samples/spring-cloud-function-sample/pom.xml +++ b/spring-cloud-function-samples/spring-cloud-function-sample/pom.xml @@ -13,7 +13,7 @@ org.springframework.boot spring-boot-starter-parent - 2.0.0.BUILD-SNAPSHOT + 1.5.0.BUILD-SNAPSHOT @@ -46,18 +46,6 @@ - - - - org.springframework.boot - spring-boot-dependencies - 2.0.0.BUILD-SNAPSHOT - pom - import - - - - diff --git a/spring-cloud-function-web/pom.xml b/spring-cloud-function-web/pom.xml index b5a3b0bba..6f6d16ba7 100644 --- a/spring-cloud-function-web/pom.xml +++ b/spring-cloud-function-web/pom.xml @@ -20,8 +20,8 @@ - org.springframework.boot.experimental - spring-boot-starter-web-reactive + org.springframework.boot + spring-boot-starter-web org.springframework.cloud @@ -34,25 +34,6 @@ - - - - org.springframework.boot - spring-boot-dependencies - 2.0.0.BUILD-SNAPSHOT - pom - import - - - org.springframework.boot.experimental - spring-boot-dependencies-web-reactive - 0.1.0.BUILD-SNAPSHOT - pom - import - - - - diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java index 42646d51c..117043345 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java @@ -20,7 +20,6 @@ import java.util.function.Function; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.boot.context.embedded.ReactiveServerProperties; import org.springframework.cloud.function.registry.FunctionCatalog; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; @@ -35,7 +34,7 @@ import reactor.core.publisher.Flux; * */ @RestController -@ConditionalOnClass({ RestController.class, ReactiveServerProperties.class }) +@ConditionalOnClass(RestController.class) public class FunctionController { @Value("${debug:${DEBUG:false}}") diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestApplication.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestApplication.java index 262bf7d26..aea2fd5a3 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestApplication.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestApplication.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,311 +16,16 @@ package org.springframework.cloud.function.web; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.function.Function; - -import org.apache.commons.io.Charsets; -import org.reactivestreams.Publisher; - import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.core.ResolvableType; -import org.springframework.core.codec.AbstractDecoder; -import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.http.codec.DecoderHttpMessageReader; -import org.springframework.http.codec.HttpMessageReader; -import org.springframework.http.codec.ServerSentEventHttpMessageReader; -import org.springframework.util.MimeType; -import org.springframework.util.StreamUtils; -import org.springframework.web.reactive.config.WebReactiveConfigurer; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.Unpooled; -import reactor.core.publisher.Flux; /** * @author Mark Fisher */ @SpringBootApplication -public class RestApplication implements WebReactiveConfigurer { - - @Override - public void extendMessageReaders(List> readers) { - readers.add(0, new ServerSentEventHttpMessageReader()); - // Instead of the default JSON decoder we want to keep the Strings unparsed - readers.add(1, new DecoderHttpMessageReader<>(new JsonObjectDecoder())); - } +public class RestApplication { public static void main(String[] args) { SpringApplication.run(RestApplication.class, args); } } - -/** - * Decode an arbitrary split byte stream representing JSON objects to a byte stream where - * each chunk is a well-formed JSON object. - * - *

- * This class does not do any real parsing or validation. A sequence of bytes is - * considered a JSON object/array if it contains a matching number of opening and closing - * braces/brackets. - * - *

- * Based on Netty - * JsonObjectDecoder - * - * @author Sebastien Deleuze - * - */ -// Copied from spring-web -class JsonObjectDecoder extends AbstractDecoder { - - private static final int ST_CORRUPTED = -1; - - private static final int ST_INIT = 0; - - private static final int ST_DECODING_NORMAL = 1; - - private static final int ST_DECODING_ARRAY_STREAM = 2; - - private final int maxObjectLength; - - private final boolean streamArrayElements; - - public JsonObjectDecoder() { - // 1 MB - this(1024 * 1024); - } - - public JsonObjectDecoder(int maxObjectLength) { - this(maxObjectLength, true); - } - - public JsonObjectDecoder(boolean streamArrayElements) { - this(1024 * 1024, streamArrayElements); - } - - /** - * @param maxObjectLength maximum number of bytes a JSON object/array may use - * (including braces and all). Objects exceeding this length are dropped and an - * {@link IllegalStateException} is thrown. - * @param streamArrayElements if set to true and the "top level" JSON object is an - * array, each of its entries is passed through the pipeline individually and - * immediately after it was fully received, allowing for arrays with - */ - public JsonObjectDecoder(int maxObjectLength, boolean streamArrayElements) { - super(new MimeType("application", "json", StandardCharsets.UTF_8), - new MimeType("application", "*+json", StandardCharsets.UTF_8)); - if (maxObjectLength < 1) { - throw new IllegalArgumentException("maxObjectLength must be a positive int"); - } - this.maxObjectLength = maxObjectLength; - this.streamArrayElements = streamArrayElements; - } - - @Override - public Flux decode(Publisher inputStream, - ResolvableType elementType, MimeType mimeType, Map hints) { - - return Flux.from(inputStream) - .flatMap(new Function>() { - - int openBraces; - int index; - int state; - boolean insideString; - ByteBuf input; - Integer writerIndex; - - @Override - public Publisher apply(DataBuffer buffer) { - List chunks = new ArrayList<>(); - if (this.input == null) { - this.input = Unpooled.copiedBuffer(buffer.asByteBuffer()); - DataBufferUtils.release(buffer); - this.writerIndex = this.input.writerIndex(); - } - else { - this.index = this.index - this.input.readerIndex(); - this.input = Unpooled.copiedBuffer(this.input, - Unpooled.copiedBuffer(buffer.asByteBuffer())); - DataBufferUtils.release(buffer); - this.writerIndex = this.input.writerIndex(); - } - if (this.state == ST_CORRUPTED) { - this.input.skipBytes(this.input.readableBytes()); - return Flux - .error(new IllegalStateException("Corrupted stream")); - } - if (this.writerIndex > maxObjectLength) { - // buffer size exceeded maxObjectLength; discarding the - // complete buffer. - this.input.skipBytes(this.input.readableBytes()); - reset(); - return Flux.error(new IllegalStateException( - "object length exceeds " + maxObjectLength + ": " - + this.writerIndex + " bytes discarded")); - } - DataBufferFactory dataBufferFactory = buffer.factory(); - for (/* use current index */; this.index < this.writerIndex; this.index++) { - byte c = this.input.getByte(this.index); - if (this.state == ST_DECODING_NORMAL) { - decodeByte(c, this.input, this.index); - - // All opening braces/brackets have been closed. That's - // enough to conclude - // that the JSON object/array is complete. - if (this.openBraces == 0) { - ByteBuf json = extractObject(this.input, - this.input.readerIndex(), - this.index + 1 - this.input.readerIndex()); - if (json != null) { - chunks.add( - dataBufferFactory.wrap(json.nioBuffer())); - } - - // The JSON object/array was extracted => discard the - // bytes from - // the input buffer. - this.input.readerIndex(this.index + 1); - // Reset the object state to get ready for the next - // JSON object/text - // coming along the byte stream. - reset(); - } - } - else if (this.state == ST_DECODING_ARRAY_STREAM) { - decodeByte(c, this.input, this.index); - - if (!this.insideString - && (this.openBraces == 1 && c == ',' - || this.openBraces == 0 && c == ']')) { - // skip leading spaces. No range check is needed and - // the loop will terminate - // because the byte at position index is not a - // whitespace. - for (int i = this.input.readerIndex(); Character - .isWhitespace(this.input.getByte(i)); i++) { - this.input.skipBytes(1); - } - - // skip trailing spaces. - int idxNoSpaces = this.index - 1; - while (idxNoSpaces >= this.input.readerIndex() - && Character.isWhitespace( - this.input.getByte(idxNoSpaces))) { - - idxNoSpaces--; - } - - ByteBuf json = extractObject(this.input, - this.input.readerIndex(), - idxNoSpaces + 1 - this.input.readerIndex()); - - if (json != null) { - chunks.add( - dataBufferFactory.wrap(json.nioBuffer())); - } - - this.input.readerIndex(this.index + 1); - - if (c == ']') { - reset(); - } - } - // JSON object/array detected. Accumulate bytes until all - // braces/brackets are closed. - } - else if (c == '{' || c == '[') { - initDecoding(c, streamArrayElements); - - if (this.state == ST_DECODING_ARRAY_STREAM) { - // Discard the array bracket - this.input.skipBytes(1); - } - // Discard leading spaces in front of a JSON object/array. - } - else if (Character.isWhitespace(c)) { - this.input.skipBytes(1); - } - else { - this.state = ST_CORRUPTED; - return Flux.error(new IllegalStateException( - "invalid JSON received at byte position " - + this.index + ": " - + ByteBufUtil.hexDump(this.input))); - } - } - - return Flux.fromIterable(chunks).map(buf -> { - try { - return StreamUtils.copyToString(buf.asInputStream(), - Charsets.UTF_8); - } - catch (IOException e) { - throw new IllegalStateException( - "Cannot convert buffer to String" + buf, e); - } - }); - } - - /** - * Override this method if you want to filter the json objects/arrays - * that get passed through the pipeline. - */ - protected ByteBuf extractObject(ByteBuf buffer, int index, - int length) { - return buffer.slice(index, length).retain(); - } - - private void decodeByte(byte c, ByteBuf input, int index) { - if ((c == '{' || c == '[') && !this.insideString) { - this.openBraces++; - } - else if ((c == '}' || c == ']') && !this.insideString) { - this.openBraces--; - } - else if (c == '"') { - // start of a new JSON string. It's necessary to detect - // strings as they may - // also contain braces/brackets and that could lead to - // incorrect results. - if (!this.insideString) { - this.insideString = true; - // If the double quote wasn't escaped then this is the end - // of a string. - } - else if (input.getByte(index - 1) != '\\') { - this.insideString = false; - } - } - } - - private void initDecoding(byte openingBrace, - boolean streamArrayElements) { - this.openBraces = 1; - if (openingBrace == '[' && streamArrayElements) { - this.state = ST_DECODING_ARRAY_STREAM; - } - else { - this.state = ST_DECODING_NORMAL; - } - } - - private void reset() { - this.insideString = false; - this.state = ST_INIT; - this.openBraces = 0; - } - }); - } - -} \ No newline at end of file diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverter.java new file mode 100644 index 000000000..7db689bf5 --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverter.java @@ -0,0 +1,323 @@ +/* + * Copyright 2012-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.flux; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.springframework.http.HttpInputMessage; +import org.springframework.http.HttpOutputMessage; +import org.springframework.http.MediaType; +import org.springframework.http.converter.HttpMessageConverter; +import org.springframework.http.converter.HttpMessageNotReadableException; +import org.springframework.http.converter.HttpMessageNotWritableException; + +import reactor.core.publisher.Flux; + +/** + * Converter for request bodies of type Flux. + * + * @author Dave Syer + * + */ +public class FluxHttpMessageConverter implements HttpMessageConverter> { + + private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream"); + + @Override + public boolean canRead(Class clazz, MediaType mediaType) { + return Flux.class.isAssignableFrom(clazz); + } + + @Override + public boolean canWrite(Class clazz, MediaType mediaType) { + return false; + } + + @Override + public List getSupportedMediaTypes() { + return Arrays.asList(MediaType.ALL); + } + + @Override + public Flux read(Class> clazz, + HttpInputMessage inputMessage) + throws IOException, HttpMessageNotReadableException { + + MediaType mediaType = inputMessage.getHeaders().getContentType(); + if (mediaType != null) { + if (mediaType.includes(MediaType.APPLICATION_JSON)) { + return new JsonObjectDecoder().decode(inputMessage.getBody()); + } + if (mediaType.includes(EVENT_STREAM)) { + return splitOnSseData(inputMessage); + } + } + + return splitOnLineEndings(inputMessage); + } + + private Flux splitOnLineEndings(HttpInputMessage inputMessage) { + return Flux.create(sink -> { + BufferedReader reader; + try { + reader = new BufferedReader( + new InputStreamReader(inputMessage.getBody())); + String line = reader.readLine(); + while (line != null) { + sink.next(line); + line = reader.readLine(); + } + } + catch (IOException e) { + sink.error(e); + } + sink.complete(); + }); + } + + private Flux splitOnSseData(HttpInputMessage inputMessage) { + return Flux.create(sink -> { + BufferedReader reader; + StringBuffer buffer = new StringBuffer(); + int emptyCount = 0; + try { + reader = new BufferedReader( + new InputStreamReader(inputMessage.getBody())); + String line = reader.readLine(); + while (line != null) { + if (line.length() == 0) { + emptyCount++; + } + else { + if (buffer.length() == 0) { + if (line.startsWith("data:")) { + line = line.length() > "data:".length() + ? line.substring("data:".length()) : ""; + } + } + else { + buffer.append("\n"); + } + buffer.append(line); + } + if (emptyCount > 0) { + sink.next(buffer.toString()); + buffer.setLength(0); + emptyCount = 0; + while (line != null && line.length() == 0) { + line = reader.readLine(); + } + } + else { + line = reader.readLine(); + } + } + if (buffer.length()>0) { + sink.next(buffer.toString()); + } + } + catch (IOException e) { + sink.error(e); + } + sink.complete(); + }); + } + + @Override + public void write(Flux t, MediaType contentType, + HttpOutputMessage outputMessage) + throws IOException, HttpMessageNotWritableException { + } + + static class JsonObjectDecoder { + + private static final int ST_CORRUPTED = -1; + + private static final int ST_INIT = 0; + + private static final int ST_DECODING_NORMAL = 1; + + private static final int ST_DECODING_ARRAY_STREAM = 2; + + private final int maxObjectLength = 1024 * 1024; + + private int openBraces; + private int state; + private boolean insideString; + private int writerIndex; + private boolean streamArrayElements = true; + + public Flux decode(InputStream body) { + InputStreamReader reader = new InputStreamReader(body); + char[] buffer = new char[1024]; + try { + List chunks = new ArrayList<>(); + int read = reader.read(buffer); + this.writerIndex += read; + while (read >= 0) { + if (this.state == ST_CORRUPTED) { + return Flux.error(new IllegalStateException("Corrupted stream")); + } + if (this.writerIndex > maxObjectLength) { + // buffer size exceeded maxObjectLength; discarding the complete + // buffer. + reset(); + return Flux.error(new IllegalStateException( + "object length exceeds " + maxObjectLength + ": " + + this.writerIndex + " bytes discarded")); + } + int point = 0; + for (int index = 0; index < read; index++) { + char c = buffer[index]; + if (this.state == ST_DECODING_NORMAL) { + decodeByte(c, buffer, index); + + // All opening braces/brackets have been closed. That's enough + // to conclude that the JSON object/array is complete. + if (this.openBraces == 0) { + char[] json = extractObject(buffer, point, + index + 1 - point); + if (json != null) { + chunks.add(new String(json)); + } + + // The JSON object/array was extracted => discard the + // bytes from the input buffer. + point += index + 1 - point; + // Reset the object state to get ready for the next JSON + // object/text coming along the byte stream. + reset(); + } + } + else if (this.state == ST_DECODING_ARRAY_STREAM) { + decodeByte(c, buffer, index); + + if (!this.insideString && (this.openBraces == 1 && c == ',' + || this.openBraces == 0 && c == ']')) { + // skip leading spaces. No range check is needed and the + // loop will terminate because the byte at position index + // is not a whitespace. + for (int i = point; Character + .isWhitespace(buffer[i]); i++) { + point++; + } + + // skip trailing spaces. + int idxNoSpaces = index - 1; + while (idxNoSpaces >= 0 + && Character.isWhitespace(buffer[idxNoSpaces])) { + idxNoSpaces--; + } + + char[] json = extractObject(buffer, point, + idxNoSpaces + 1 - point); + if (json != null) { + chunks.add(new String(json)); + } + + point += index + 1 - point; + + if (c == ']') { + reset(); + } + } + // JSON object/array detected. Accumulate bytes until all + // braces/brackets are closed. + } + else if (c == '{' || c == '[') { + initDecoding(c, this.streamArrayElements); + + if (this.state == ST_DECODING_ARRAY_STREAM) { + // Discard the array bracket + point++; + } + // Discard leading spaces in front of a JSON object/array. + } + else if (Character.isWhitespace(c)) { + point++; + } + else { + this.state = ST_CORRUPTED; + return Flux.error(new IllegalStateException( + "invalid JSON received at byte position " + + writerIndex)); + } + } + read = reader.read(buffer); + } + + return Flux.fromIterable(chunks); + } + catch (IOException e) { + return Flux.error(new IllegalStateException("Cannot read stream", e)); + } + } + + private char[] extractObject(char[] buffer, int index, int length) { + if (length <= 0) { + return null; + } + return Arrays.copyOfRange(buffer, index, index + length); + } + + private void decodeByte(char c, char[] input, int index) { + if ((c == '{' || c == '[') && !this.insideString) { + this.openBraces++; + } + else if ((c == '}' || c == ']') && !this.insideString) { + this.openBraces--; + } + else if (c == '"') { + // start of a new JSON string. It's necessary to detect strings as they + // may also contain braces/brackets and that could lead to incorrect + // results. + if (!this.insideString) { + this.insideString = true; + // If the double quote wasn't escaped then this is the end of a + // string. + } + else if (input[index - 1] != '\\') { + this.insideString = false; + } + } + } + + private void initDecoding(char openingBrace, boolean streamArrayElements) { + this.openBraces = 1; + if (openingBrace == '[' && streamArrayElements) { + this.state = ST_DECODING_ARRAY_STREAM; + } + else { + this.state = ST_DECODING_NORMAL; + } + } + + private void reset() { + this.insideString = false; + this.state = ST_INIT; + this.openBraces = 0; + } + + } + +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java new file mode 100644 index 000000000..ae449caa3 --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java @@ -0,0 +1,41 @@ +/* + * Copyright 2013-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.flux; + +import org.springframework.http.MediaType; +import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; + +import reactor.core.publisher.Flux; + +/** + * A specialized {@link ResponseBodyEmitter} that handles {@link Flux} return types. + * + * @author Dave Syer + */ +class FluxResponseBodyEmitter extends ResponseBodyEmitter { + + public FluxResponseBodyEmitter(Flux observable) { + this(null, null, observable); + } + + public FluxResponseBodyEmitter(Long timeout, MediaType mediaType, + Flux observable) { + super(timeout); + new ResponseBodyEmitterSubscriber<>(mediaType, observable, this); + } + +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java new file mode 100644 index 000000000..54b55911d --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java @@ -0,0 +1,42 @@ +/* + * Copyright 2013-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.flux; + +import org.springframework.http.MediaType; +import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import reactor.core.publisher.Flux; + +/** + * A specialized {@link ResponseBodyEmitter} that handles {@link Flux} return types with + * SSE streams. + * + * @author Dave Syer + */ +class FluxResponseSseEmitter extends SseEmitter { + + public FluxResponseSseEmitter(Flux observable) { + this(null, MediaType.valueOf("text/event-stream"), observable); + } + + public FluxResponseSseEmitter(Long timeout, MediaType mediaType, Flux observable) { + super(timeout); + new ResponseBodyEmitterSubscriber<>(mediaType, observable, this); + } + +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java new file mode 100644 index 000000000..f469f0d96 --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java @@ -0,0 +1,93 @@ +/* + * Copyright 2013-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.flux; + +import java.util.List; + +import org.springframework.core.MethodParameter; +import org.springframework.core.ResolvableType; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.http.converter.HttpMessageConverter; +import org.springframework.web.context.request.NativeWebRequest; +import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler; +import org.springframework.web.method.support.ModelAndViewContainer; +import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; +import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler; + +import reactor.core.publisher.Flux; + +/** + * A specialized {@link AsyncHandlerMethodReturnValueHandler} that handles {@link Flux} + * return types. + * + * @author Dave Syer + */ +public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHandler { + + private ResponseBodyEmitterReturnValueHandler delegate; + private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream"); + + public FluxReturnValueHandler(List> messageConverters) { + delegate = new ResponseBodyEmitterReturnValueHandler(messageConverters); + } + + @Override + public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) { + return returnValue != null && supportsReturnType(returnType); + } + + @Override + public boolean supportsReturnType(MethodParameter returnType) { + return Flux.class.isAssignableFrom(returnType.getParameterType()) + || isResponseEntity(returnType); + } + + private boolean isResponseEntity(MethodParameter returnType) { + if (ResponseEntity.class.isAssignableFrom(returnType.getParameterType())) { + Class bodyType = ResolvableType.forMethodParameter(returnType) + .getGeneric(0).resolve(); + return bodyType != null && Flux.class.isAssignableFrom(bodyType); + } + return false; + } + + @Override + public void handleReturnValue(Object returnValue, MethodParameter returnType, + ModelAndViewContainer mavContainer, NativeWebRequest webRequest) + throws Exception { + Object adaptFrom = returnValue; + if (returnValue instanceof ResponseEntity) { + adaptFrom = ((ResponseEntity) returnValue).getBody(); + } + Flux flux = (Flux) adaptFrom; + + MediaType mediaType = webRequest.getHeader("Accept") == null ? null + : MediaType.parseMediaTypes(webRequest.getHeader("Accept")).iterator() + .next(); + delegate.handleReturnValue(getEmitter(1000L, flux, mediaType), + returnType, mavContainer, webRequest); + } + + private ResponseBodyEmitter getEmitter(Long timeout, Flux flux, MediaType mediaType) { + if (EVENT_STREAM.isCompatibleWith(mediaType)) { + return new FluxResponseSseEmitter<>(timeout, mediaType, flux); + } + return new FluxResponseBodyEmitter<>(timeout, mediaType, flux); + } + +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ReactorAutoConfiguration.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ReactorAutoConfiguration.java new file mode 100644 index 000000000..271c52a2f --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ReactorAutoConfiguration.java @@ -0,0 +1,64 @@ +/* + * Copyright 2013-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.flux; + +import java.util.List; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication; +import org.springframework.boot.autoconfigure.web.HttpMessageConverters; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler; +import org.springframework.web.method.support.HandlerMethodReturnValueHandler; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter; + +import reactor.core.publisher.Flux; + +/** + * @author Dave Syer + */ +@Configuration +@ConditionalOnWebApplication +@ConditionalOnClass({ Flux.class, AsyncHandlerMethodReturnValueHandler.class }) +public class ReactorAutoConfiguration extends WebMvcConfigurerAdapter { + + @Autowired + private FluxReturnValueHandler returnValueHandler; + + @Bean + public FluxReturnValueHandler fluxReturnValueHandler( + HttpMessageConverters converters) { + return new FluxReturnValueHandler(converters.getConverters()); + } + + @Configuration + protected static class MessageConverters { + @Bean + public HttpMessageConverters httpMessageConverters() { + return new HttpMessageConverters(new FluxHttpMessageConverter()); + } + } + + @Override + public void addReturnValueHandlers( + List returnValueHandlers) { + returnValueHandlers.add(returnValueHandler); + } + +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java new file mode 100644 index 000000000..f06130039 --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java @@ -0,0 +1,130 @@ +/* + * Copyright 2013-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.flux; + +import java.io.IOException; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import org.springframework.http.MediaType; +import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; + +import reactor.core.publisher.Flux; + +/** + * Subscriber that emits any value produced by the {@link Flux} into the delegated + * {@link ResponseBodyEmitter}. + * + * @author Dave Syer + */ +class ResponseBodyEmitterSubscriber implements Subscriber, Runnable { + + private final MediaType mediaType; + + private Subscription subscription; + + private final ResponseBodyEmitter responseBodyEmitter; + + private boolean completed; + + private boolean firstElementWritten; + + public ResponseBodyEmitterSubscriber(MediaType mediaType, Flux observable, + ResponseBodyEmitter responseBodyEmitter) { + + this.mediaType = mediaType; + this.responseBodyEmitter = responseBodyEmitter; + this.responseBodyEmitter.onTimeout(this); + this.responseBodyEmitter.onCompletion(this); + observable.subscribe(this); + } + + @Override + public void onSubscribe(Subscription subscription) { + if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { + try { + this.responseBodyEmitter.send("["); + } + catch (IOException e) { + // Urgh? + } + } + this.subscription = subscription; + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(T value) { + + Object object = value; + + try { + if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { + if (!this.firstElementWritten) { + this.firstElementWritten = true; + } + else { + responseBodyEmitter.send(","); + } + if (value.getClass()==String.class) { + object = "\"" + value + "\""; + } + } + if (!completed) { + responseBodyEmitter.send(object, mediaType); + } + } + catch ( + + IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + @Override + public void onError(Throwable e) { + responseBodyEmitter.completeWithError(e); + } + + @Override + public void onComplete() { + if (!completed) { + completed = true; + try { + if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { + if (!this.firstElementWritten) { + + this.firstElementWritten = true; + } + else { + responseBodyEmitter.send("]"); + } + } + } + catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + responseBodyEmitter.complete(); + } + } + + @Override + public void run() { + this.subscription.cancel(); + } +} diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java index 3165b417f..34475a1d0 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java @@ -47,17 +47,17 @@ import reactor.core.publisher.Flux; @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) public class RestApplicationTests { + private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream"); @LocalServerPort private int port; private TestRestTemplate rest = new TestRestTemplate(); @Test public void wordsSSE() throws Exception { - assertThat( - rest.exchange( - RequestEntity.get(new URI("http://localhost:" + port + "/words")) - .accept(MediaType.TEXT_EVENT_STREAM).build(), - String.class).getBody()).isEqualTo(sse("foo", "bar")); + assertThat(rest.exchange( + RequestEntity.get(new URI("http://localhost:" + port + "/words")) + .accept(EVENT_STREAM).build(), + String.class).getBody()).isEqualTo(sse("foo", "bar")); } @Test @@ -100,17 +100,17 @@ public class RestApplicationTests { .exchange( RequestEntity.post(new URI("http://localhost:" + port + "/maps")) .contentType(MediaType.APPLICATION_JSON) - .body("{\"value\":\"foo\"}{\"value\":\"bar\"}"), + // TODO: make this work without newline separator + .body("{\"value\":\"foo\"}\n{\"value\":\"bar\"}"), String.class) .getBody()).isEqualTo("{\"value\":\"FOO\"}{\"value\":\"BAR\"}"); } @Test public void uppercaseSSE() throws Exception { - assertThat(rest.exchange( - RequestEntity.post(new URI("http://localhost:" + port + "/uppercase")) - .accept(MediaType.TEXT_EVENT_STREAM) - .contentType(MediaType.TEXT_EVENT_STREAM).body(sse("foo", "bar")), + assertThat(rest.exchange(RequestEntity + .post(new URI("http://localhost:" + port + "/uppercase")) + .accept(EVENT_STREAM).contentType(EVENT_STREAM).body(sse("foo", "bar")), String.class).getBody()).isEqualTo(sse("[FOO]", "[BAR]")); } @@ -123,7 +123,7 @@ public class RestApplicationTests { @Bean public Function, Flux> uppercase() { - return flux -> flux.map(value -> "[" + value.trim().toUpperCase() + "]"); + return flux -> flux.log().map(value -> "[" + value.trim().toUpperCase() + "]"); } @Bean diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverterTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverterTests.java new file mode 100644 index 000000000..ee936f543 --- /dev/null +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverterTests.java @@ -0,0 +1,108 @@ +/* + * Copyright 2012-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.flux; + +import org.junit.Test; + +import org.springframework.http.MediaType; +import org.springframework.mock.http.MockHttpInputMessage; + +import static org.assertj.core.api.Assertions.assertThat; + +import reactor.core.publisher.Flux; + +/** + * @author Dave Syer + * + */ +public class FluxHttpMessageConverterTests { + + private FluxHttpMessageConverter converter = new FluxHttpMessageConverter(); + + private Class> type = null; + + @Test + public void newlines() throws Exception { + MockHttpInputMessage message = new MockHttpInputMessage("foo\nbar".getBytes()); + assertThat(converter.read(type, message).collectList().block()).contains("foo", + "bar"); + } + + @Test + public void sse() throws Exception { + MockHttpInputMessage message = new MockHttpInputMessage( + "data:foo\n\ndata:bar".getBytes()); + message.getHeaders().setContentType(MediaType.valueOf("text/event-stream")); + assertThat(converter.read(type, message).collectList().block()).contains("foo", + "bar"); + } + + @Test + public void jsonStream() throws Exception { + MockHttpInputMessage message = new MockHttpInputMessage( + "{\"value\":\"foo\"}{\"value\":\"barrier\"}".getBytes()); + message.getHeaders().setContentType(MediaType.APPLICATION_JSON); + assertThat(converter.read(type, message).collectList().block()) + .contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}"); + } + + @Test + public void jsonStreamWhitespace() throws Exception { + MockHttpInputMessage message = new MockHttpInputMessage( + "{\"value\":\"foo\"} {\"value\":\"barrier\"} ".getBytes()); + message.getHeaders().setContentType(MediaType.APPLICATION_JSON); + assertThat(converter.read(type, message).collectList().block()) + .contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}"); + } + + @Test + public void jsonStreamNewline() throws Exception { + MockHttpInputMessage message = new MockHttpInputMessage( + "{\"value\":\"foo\"}\n{\"value\":\"barrier\"}".getBytes()); + message.getHeaders().setContentType(MediaType.APPLICATION_JSON); + assertThat(converter.read(type, message).collectList().block()) + .contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}"); + } + + @Test + public void jsonArray() throws Exception { + MockHttpInputMessage message = new MockHttpInputMessage( + "[{\"value\":\"foo\"},{\"value\":\"barrier\"}]".getBytes()); + message.getHeaders().setContentType(MediaType.APPLICATION_JSON); + assertThat(converter.read(type, message).collectList().block()) + .contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}"); + } + + @Test + public void jsonArrayWhitespace() throws Exception { + MockHttpInputMessage message = new MockHttpInputMessage( + "[{\"value\":\"foo\"}, {\"value\":\"barrier\"}] ".getBytes()); + message.getHeaders().setContentType(MediaType.APPLICATION_JSON); + assertThat(converter.read(type, message).collectList().block()) + .contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}"); + } + + @Test + public void jsonArrayNewline() throws Exception { + MockHttpInputMessage message = new MockHttpInputMessage( + "[{\"value\":\"foo\"},\n{\"value\":\"barrier\"}]".getBytes()); + message.getHeaders().setContentType(MediaType.APPLICATION_JSON); + assertThat(converter.read(type, message).collectList().block()) + .contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}"); + } + +}