diff --git a/spring-cloud-function-context/pom.xml b/spring-cloud-function-context/pom.xml index 757fe4d27..863e28b3c 100644 --- a/spring-cloud-function-context/pom.xml +++ b/spring-cloud-function-context/pom.xml @@ -37,18 +37,6 @@ - - - - org.springframework.boot - spring-boot-dependencies - 2.0.0.BUILD-SNAPSHOT - pom - import - - - - diff --git a/spring-cloud-function-deployer/pom.xml b/spring-cloud-function-deployer/pom.xml index dccb0937b..f8daf6a0e 100644 --- a/spring-cloud-function-deployer/pom.xml +++ b/spring-cloud-function-deployer/pom.xml @@ -54,20 +54,6 @@ - - org.springframework.boot - spring-boot-dependencies - 2.0.0.BUILD-SNAPSHOT - pom - import - - - org.springframework.boot.experimental - spring-boot-dependencies-web-reactive - 0.1.0.BUILD-SNAPSHOT - pom - import - org.springframework.cloud spring-cloud-function-parent diff --git a/spring-cloud-function-samples/spring-cloud-function-sample-pojo/pom.xml b/spring-cloud-function-samples/spring-cloud-function-sample-pojo/pom.xml index fb2b1973a..620f889ba 100644 --- a/spring-cloud-function-samples/spring-cloud-function-sample-pojo/pom.xml +++ b/spring-cloud-function-samples/spring-cloud-function-sample-pojo/pom.xml @@ -7,13 +7,13 @@ function-sample-pojo 1.0.0.BUILD-SNAPSHOT jar - spring-cloud-function-sample + spring-cloud-function-sample-pojo Spring Cloud Function Web Support org.springframework.boot spring-boot-starter-parent - 2.0.0.BUILD-SNAPSHOT + 1.5.0.BUILD-SNAPSHOT @@ -41,18 +41,6 @@ - - - - org.springframework.boot - spring-boot-dependencies - 2.0.0.BUILD-SNAPSHOT - pom - import - - - - diff --git a/spring-cloud-function-samples/spring-cloud-function-sample/pom.xml b/spring-cloud-function-samples/spring-cloud-function-sample/pom.xml index c2d277cc1..5940479cd 100644 --- a/spring-cloud-function-samples/spring-cloud-function-sample/pom.xml +++ b/spring-cloud-function-samples/spring-cloud-function-sample/pom.xml @@ -13,7 +13,7 @@ org.springframework.boot spring-boot-starter-parent - 2.0.0.BUILD-SNAPSHOT + 1.5.0.BUILD-SNAPSHOT @@ -46,18 +46,6 @@ - - - - org.springframework.boot - spring-boot-dependencies - 2.0.0.BUILD-SNAPSHOT - pom - import - - - - diff --git a/spring-cloud-function-web/pom.xml b/spring-cloud-function-web/pom.xml index b5a3b0bba..6f6d16ba7 100644 --- a/spring-cloud-function-web/pom.xml +++ b/spring-cloud-function-web/pom.xml @@ -20,8 +20,8 @@ - org.springframework.boot.experimental - spring-boot-starter-web-reactive + org.springframework.boot + spring-boot-starter-web org.springframework.cloud @@ -34,25 +34,6 @@ - - - - org.springframework.boot - spring-boot-dependencies - 2.0.0.BUILD-SNAPSHOT - pom - import - - - org.springframework.boot.experimental - spring-boot-dependencies-web-reactive - 0.1.0.BUILD-SNAPSHOT - pom - import - - - - diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java index 42646d51c..117043345 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java @@ -20,7 +20,6 @@ import java.util.function.Function; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.boot.context.embedded.ReactiveServerProperties; import org.springframework.cloud.function.registry.FunctionCatalog; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; @@ -35,7 +34,7 @@ import reactor.core.publisher.Flux; * */ @RestController -@ConditionalOnClass({ RestController.class, ReactiveServerProperties.class }) +@ConditionalOnClass(RestController.class) public class FunctionController { @Value("${debug:${DEBUG:false}}") diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestApplication.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestApplication.java index 262bf7d26..aea2fd5a3 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestApplication.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestApplication.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,311 +16,16 @@ package org.springframework.cloud.function.web; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.function.Function; - -import org.apache.commons.io.Charsets; -import org.reactivestreams.Publisher; - import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.core.ResolvableType; -import org.springframework.core.codec.AbstractDecoder; -import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.http.codec.DecoderHttpMessageReader; -import org.springframework.http.codec.HttpMessageReader; -import org.springframework.http.codec.ServerSentEventHttpMessageReader; -import org.springframework.util.MimeType; -import org.springframework.util.StreamUtils; -import org.springframework.web.reactive.config.WebReactiveConfigurer; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.Unpooled; -import reactor.core.publisher.Flux; /** * @author Mark Fisher */ @SpringBootApplication -public class RestApplication implements WebReactiveConfigurer { - - @Override - public void extendMessageReaders(List> readers) { - readers.add(0, new ServerSentEventHttpMessageReader()); - // Instead of the default JSON decoder we want to keep the Strings unparsed - readers.add(1, new DecoderHttpMessageReader<>(new JsonObjectDecoder())); - } +public class RestApplication { public static void main(String[] args) { SpringApplication.run(RestApplication.class, args); } } - -/** - * Decode an arbitrary split byte stream representing JSON objects to a byte stream where - * each chunk is a well-formed JSON object. - * - *

- * This class does not do any real parsing or validation. A sequence of bytes is - * considered a JSON object/array if it contains a matching number of opening and closing - * braces/brackets. - * - *

- * Based on Netty - * JsonObjectDecoder - * - * @author Sebastien Deleuze - * - */ -// Copied from spring-web -class JsonObjectDecoder extends AbstractDecoder { - - private static final int ST_CORRUPTED = -1; - - private static final int ST_INIT = 0; - - private static final int ST_DECODING_NORMAL = 1; - - private static final int ST_DECODING_ARRAY_STREAM = 2; - - private final int maxObjectLength; - - private final boolean streamArrayElements; - - public JsonObjectDecoder() { - // 1 MB - this(1024 * 1024); - } - - public JsonObjectDecoder(int maxObjectLength) { - this(maxObjectLength, true); - } - - public JsonObjectDecoder(boolean streamArrayElements) { - this(1024 * 1024, streamArrayElements); - } - - /** - * @param maxObjectLength maximum number of bytes a JSON object/array may use - * (including braces and all). Objects exceeding this length are dropped and an - * {@link IllegalStateException} is thrown. - * @param streamArrayElements if set to true and the "top level" JSON object is an - * array, each of its entries is passed through the pipeline individually and - * immediately after it was fully received, allowing for arrays with - */ - public JsonObjectDecoder(int maxObjectLength, boolean streamArrayElements) { - super(new MimeType("application", "json", StandardCharsets.UTF_8), - new MimeType("application", "*+json", StandardCharsets.UTF_8)); - if (maxObjectLength < 1) { - throw new IllegalArgumentException("maxObjectLength must be a positive int"); - } - this.maxObjectLength = maxObjectLength; - this.streamArrayElements = streamArrayElements; - } - - @Override - public Flux decode(Publisher inputStream, - ResolvableType elementType, MimeType mimeType, Map hints) { - - return Flux.from(inputStream) - .flatMap(new Function>() { - - int openBraces; - int index; - int state; - boolean insideString; - ByteBuf input; - Integer writerIndex; - - @Override - public Publisher apply(DataBuffer buffer) { - List chunks = new ArrayList<>(); - if (this.input == null) { - this.input = Unpooled.copiedBuffer(buffer.asByteBuffer()); - DataBufferUtils.release(buffer); - this.writerIndex = this.input.writerIndex(); - } - else { - this.index = this.index - this.input.readerIndex(); - this.input = Unpooled.copiedBuffer(this.input, - Unpooled.copiedBuffer(buffer.asByteBuffer())); - DataBufferUtils.release(buffer); - this.writerIndex = this.input.writerIndex(); - } - if (this.state == ST_CORRUPTED) { - this.input.skipBytes(this.input.readableBytes()); - return Flux - .error(new IllegalStateException("Corrupted stream")); - } - if (this.writerIndex > maxObjectLength) { - // buffer size exceeded maxObjectLength; discarding the - // complete buffer. - this.input.skipBytes(this.input.readableBytes()); - reset(); - return Flux.error(new IllegalStateException( - "object length exceeds " + maxObjectLength + ": " - + this.writerIndex + " bytes discarded")); - } - DataBufferFactory dataBufferFactory = buffer.factory(); - for (/* use current index */; this.index < this.writerIndex; this.index++) { - byte c = this.input.getByte(this.index); - if (this.state == ST_DECODING_NORMAL) { - decodeByte(c, this.input, this.index); - - // All opening braces/brackets have been closed. That's - // enough to conclude - // that the JSON object/array is complete. - if (this.openBraces == 0) { - ByteBuf json = extractObject(this.input, - this.input.readerIndex(), - this.index + 1 - this.input.readerIndex()); - if (json != null) { - chunks.add( - dataBufferFactory.wrap(json.nioBuffer())); - } - - // The JSON object/array was extracted => discard the - // bytes from - // the input buffer. - this.input.readerIndex(this.index + 1); - // Reset the object state to get ready for the next - // JSON object/text - // coming along the byte stream. - reset(); - } - } - else if (this.state == ST_DECODING_ARRAY_STREAM) { - decodeByte(c, this.input, this.index); - - if (!this.insideString - && (this.openBraces == 1 && c == ',' - || this.openBraces == 0 && c == ']')) { - // skip leading spaces. No range check is needed and - // the loop will terminate - // because the byte at position index is not a - // whitespace. - for (int i = this.input.readerIndex(); Character - .isWhitespace(this.input.getByte(i)); i++) { - this.input.skipBytes(1); - } - - // skip trailing spaces. - int idxNoSpaces = this.index - 1; - while (idxNoSpaces >= this.input.readerIndex() - && Character.isWhitespace( - this.input.getByte(idxNoSpaces))) { - - idxNoSpaces--; - } - - ByteBuf json = extractObject(this.input, - this.input.readerIndex(), - idxNoSpaces + 1 - this.input.readerIndex()); - - if (json != null) { - chunks.add( - dataBufferFactory.wrap(json.nioBuffer())); - } - - this.input.readerIndex(this.index + 1); - - if (c == ']') { - reset(); - } - } - // JSON object/array detected. Accumulate bytes until all - // braces/brackets are closed. - } - else if (c == '{' || c == '[') { - initDecoding(c, streamArrayElements); - - if (this.state == ST_DECODING_ARRAY_STREAM) { - // Discard the array bracket - this.input.skipBytes(1); - } - // Discard leading spaces in front of a JSON object/array. - } - else if (Character.isWhitespace(c)) { - this.input.skipBytes(1); - } - else { - this.state = ST_CORRUPTED; - return Flux.error(new IllegalStateException( - "invalid JSON received at byte position " - + this.index + ": " - + ByteBufUtil.hexDump(this.input))); - } - } - - return Flux.fromIterable(chunks).map(buf -> { - try { - return StreamUtils.copyToString(buf.asInputStream(), - Charsets.UTF_8); - } - catch (IOException e) { - throw new IllegalStateException( - "Cannot convert buffer to String" + buf, e); - } - }); - } - - /** - * Override this method if you want to filter the json objects/arrays - * that get passed through the pipeline. - */ - protected ByteBuf extractObject(ByteBuf buffer, int index, - int length) { - return buffer.slice(index, length).retain(); - } - - private void decodeByte(byte c, ByteBuf input, int index) { - if ((c == '{' || c == '[') && !this.insideString) { - this.openBraces++; - } - else if ((c == '}' || c == ']') && !this.insideString) { - this.openBraces--; - } - else if (c == '"') { - // start of a new JSON string. It's necessary to detect - // strings as they may - // also contain braces/brackets and that could lead to - // incorrect results. - if (!this.insideString) { - this.insideString = true; - // If the double quote wasn't escaped then this is the end - // of a string. - } - else if (input.getByte(index - 1) != '\\') { - this.insideString = false; - } - } - } - - private void initDecoding(byte openingBrace, - boolean streamArrayElements) { - this.openBraces = 1; - if (openingBrace == '[' && streamArrayElements) { - this.state = ST_DECODING_ARRAY_STREAM; - } - else { - this.state = ST_DECODING_NORMAL; - } - } - - private void reset() { - this.insideString = false; - this.state = ST_INIT; - this.openBraces = 0; - } - }); - } - -} \ No newline at end of file diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverter.java new file mode 100644 index 000000000..7db689bf5 --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverter.java @@ -0,0 +1,323 @@ +/* + * Copyright 2012-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.flux; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.springframework.http.HttpInputMessage; +import org.springframework.http.HttpOutputMessage; +import org.springframework.http.MediaType; +import org.springframework.http.converter.HttpMessageConverter; +import org.springframework.http.converter.HttpMessageNotReadableException; +import org.springframework.http.converter.HttpMessageNotWritableException; + +import reactor.core.publisher.Flux; + +/** + * Converter for request bodies of type Flux. + * + * @author Dave Syer + * + */ +public class FluxHttpMessageConverter implements HttpMessageConverter> { + + private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream"); + + @Override + public boolean canRead(Class clazz, MediaType mediaType) { + return Flux.class.isAssignableFrom(clazz); + } + + @Override + public boolean canWrite(Class clazz, MediaType mediaType) { + return false; + } + + @Override + public List getSupportedMediaTypes() { + return Arrays.asList(MediaType.ALL); + } + + @Override + public Flux read(Class> clazz, + HttpInputMessage inputMessage) + throws IOException, HttpMessageNotReadableException { + + MediaType mediaType = inputMessage.getHeaders().getContentType(); + if (mediaType != null) { + if (mediaType.includes(MediaType.APPLICATION_JSON)) { + return new JsonObjectDecoder().decode(inputMessage.getBody()); + } + if (mediaType.includes(EVENT_STREAM)) { + return splitOnSseData(inputMessage); + } + } + + return splitOnLineEndings(inputMessage); + } + + private Flux splitOnLineEndings(HttpInputMessage inputMessage) { + return Flux.create(sink -> { + BufferedReader reader; + try { + reader = new BufferedReader( + new InputStreamReader(inputMessage.getBody())); + String line = reader.readLine(); + while (line != null) { + sink.next(line); + line = reader.readLine(); + } + } + catch (IOException e) { + sink.error(e); + } + sink.complete(); + }); + } + + private Flux splitOnSseData(HttpInputMessage inputMessage) { + return Flux.create(sink -> { + BufferedReader reader; + StringBuffer buffer = new StringBuffer(); + int emptyCount = 0; + try { + reader = new BufferedReader( + new InputStreamReader(inputMessage.getBody())); + String line = reader.readLine(); + while (line != null) { + if (line.length() == 0) { + emptyCount++; + } + else { + if (buffer.length() == 0) { + if (line.startsWith("data:")) { + line = line.length() > "data:".length() + ? line.substring("data:".length()) : ""; + } + } + else { + buffer.append("\n"); + } + buffer.append(line); + } + if (emptyCount > 0) { + sink.next(buffer.toString()); + buffer.setLength(0); + emptyCount = 0; + while (line != null && line.length() == 0) { + line = reader.readLine(); + } + } + else { + line = reader.readLine(); + } + } + if (buffer.length()>0) { + sink.next(buffer.toString()); + } + } + catch (IOException e) { + sink.error(e); + } + sink.complete(); + }); + } + + @Override + public void write(Flux t, MediaType contentType, + HttpOutputMessage outputMessage) + throws IOException, HttpMessageNotWritableException { + } + + static class JsonObjectDecoder { + + private static final int ST_CORRUPTED = -1; + + private static final int ST_INIT = 0; + + private static final int ST_DECODING_NORMAL = 1; + + private static final int ST_DECODING_ARRAY_STREAM = 2; + + private final int maxObjectLength = 1024 * 1024; + + private int openBraces; + private int state; + private boolean insideString; + private int writerIndex; + private boolean streamArrayElements = true; + + public Flux decode(InputStream body) { + InputStreamReader reader = new InputStreamReader(body); + char[] buffer = new char[1024]; + try { + List chunks = new ArrayList<>(); + int read = reader.read(buffer); + this.writerIndex += read; + while (read >= 0) { + if (this.state == ST_CORRUPTED) { + return Flux.error(new IllegalStateException("Corrupted stream")); + } + if (this.writerIndex > maxObjectLength) { + // buffer size exceeded maxObjectLength; discarding the complete + // buffer. + reset(); + return Flux.error(new IllegalStateException( + "object length exceeds " + maxObjectLength + ": " + + this.writerIndex + " bytes discarded")); + } + int point = 0; + for (int index = 0; index < read; index++) { + char c = buffer[index]; + if (this.state == ST_DECODING_NORMAL) { + decodeByte(c, buffer, index); + + // All opening braces/brackets have been closed. That's enough + // to conclude that the JSON object/array is complete. + if (this.openBraces == 0) { + char[] json = extractObject(buffer, point, + index + 1 - point); + if (json != null) { + chunks.add(new String(json)); + } + + // The JSON object/array was extracted => discard the + // bytes from the input buffer. + point += index + 1 - point; + // Reset the object state to get ready for the next JSON + // object/text coming along the byte stream. + reset(); + } + } + else if (this.state == ST_DECODING_ARRAY_STREAM) { + decodeByte(c, buffer, index); + + if (!this.insideString && (this.openBraces == 1 && c == ',' + || this.openBraces == 0 && c == ']')) { + // skip leading spaces. No range check is needed and the + // loop will terminate because the byte at position index + // is not a whitespace. + for (int i = point; Character + .isWhitespace(buffer[i]); i++) { + point++; + } + + // skip trailing spaces. + int idxNoSpaces = index - 1; + while (idxNoSpaces >= 0 + && Character.isWhitespace(buffer[idxNoSpaces])) { + idxNoSpaces--; + } + + char[] json = extractObject(buffer, point, + idxNoSpaces + 1 - point); + if (json != null) { + chunks.add(new String(json)); + } + + point += index + 1 - point; + + if (c == ']') { + reset(); + } + } + // JSON object/array detected. Accumulate bytes until all + // braces/brackets are closed. + } + else if (c == '{' || c == '[') { + initDecoding(c, this.streamArrayElements); + + if (this.state == ST_DECODING_ARRAY_STREAM) { + // Discard the array bracket + point++; + } + // Discard leading spaces in front of a JSON object/array. + } + else if (Character.isWhitespace(c)) { + point++; + } + else { + this.state = ST_CORRUPTED; + return Flux.error(new IllegalStateException( + "invalid JSON received at byte position " + + writerIndex)); + } + } + read = reader.read(buffer); + } + + return Flux.fromIterable(chunks); + } + catch (IOException e) { + return Flux.error(new IllegalStateException("Cannot read stream", e)); + } + } + + private char[] extractObject(char[] buffer, int index, int length) { + if (length <= 0) { + return null; + } + return Arrays.copyOfRange(buffer, index, index + length); + } + + private void decodeByte(char c, char[] input, int index) { + if ((c == '{' || c == '[') && !this.insideString) { + this.openBraces++; + } + else if ((c == '}' || c == ']') && !this.insideString) { + this.openBraces--; + } + else if (c == '"') { + // start of a new JSON string. It's necessary to detect strings as they + // may also contain braces/brackets and that could lead to incorrect + // results. + if (!this.insideString) { + this.insideString = true; + // If the double quote wasn't escaped then this is the end of a + // string. + } + else if (input[index - 1] != '\\') { + this.insideString = false; + } + } + } + + private void initDecoding(char openingBrace, boolean streamArrayElements) { + this.openBraces = 1; + if (openingBrace == '[' && streamArrayElements) { + this.state = ST_DECODING_ARRAY_STREAM; + } + else { + this.state = ST_DECODING_NORMAL; + } + } + + private void reset() { + this.insideString = false; + this.state = ST_INIT; + this.openBraces = 0; + } + + } + +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java new file mode 100644 index 000000000..ae449caa3 --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java @@ -0,0 +1,41 @@ +/* + * Copyright 2013-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.flux; + +import org.springframework.http.MediaType; +import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; + +import reactor.core.publisher.Flux; + +/** + * A specialized {@link ResponseBodyEmitter} that handles {@link Flux} return types. + * + * @author Dave Syer + */ +class FluxResponseBodyEmitter extends ResponseBodyEmitter { + + public FluxResponseBodyEmitter(Flux observable) { + this(null, null, observable); + } + + public FluxResponseBodyEmitter(Long timeout, MediaType mediaType, + Flux observable) { + super(timeout); + new ResponseBodyEmitterSubscriber<>(mediaType, observable, this); + } + +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java new file mode 100644 index 000000000..54b55911d --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java @@ -0,0 +1,42 @@ +/* + * Copyright 2013-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.flux; + +import org.springframework.http.MediaType; +import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import reactor.core.publisher.Flux; + +/** + * A specialized {@link ResponseBodyEmitter} that handles {@link Flux} return types with + * SSE streams. + * + * @author Dave Syer + */ +class FluxResponseSseEmitter extends SseEmitter { + + public FluxResponseSseEmitter(Flux observable) { + this(null, MediaType.valueOf("text/event-stream"), observable); + } + + public FluxResponseSseEmitter(Long timeout, MediaType mediaType, Flux observable) { + super(timeout); + new ResponseBodyEmitterSubscriber<>(mediaType, observable, this); + } + +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java new file mode 100644 index 000000000..f469f0d96 --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java @@ -0,0 +1,93 @@ +/* + * Copyright 2013-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.flux; + +import java.util.List; + +import org.springframework.core.MethodParameter; +import org.springframework.core.ResolvableType; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.http.converter.HttpMessageConverter; +import org.springframework.web.context.request.NativeWebRequest; +import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler; +import org.springframework.web.method.support.ModelAndViewContainer; +import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; +import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler; + +import reactor.core.publisher.Flux; + +/** + * A specialized {@link AsyncHandlerMethodReturnValueHandler} that handles {@link Flux} + * return types. + * + * @author Dave Syer + */ +public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHandler { + + private ResponseBodyEmitterReturnValueHandler delegate; + private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream"); + + public FluxReturnValueHandler(List> messageConverters) { + delegate = new ResponseBodyEmitterReturnValueHandler(messageConverters); + } + + @Override + public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) { + return returnValue != null && supportsReturnType(returnType); + } + + @Override + public boolean supportsReturnType(MethodParameter returnType) { + return Flux.class.isAssignableFrom(returnType.getParameterType()) + || isResponseEntity(returnType); + } + + private boolean isResponseEntity(MethodParameter returnType) { + if (ResponseEntity.class.isAssignableFrom(returnType.getParameterType())) { + Class bodyType = ResolvableType.forMethodParameter(returnType) + .getGeneric(0).resolve(); + return bodyType != null && Flux.class.isAssignableFrom(bodyType); + } + return false; + } + + @Override + public void handleReturnValue(Object returnValue, MethodParameter returnType, + ModelAndViewContainer mavContainer, NativeWebRequest webRequest) + throws Exception { + Object adaptFrom = returnValue; + if (returnValue instanceof ResponseEntity) { + adaptFrom = ((ResponseEntity) returnValue).getBody(); + } + Flux flux = (Flux) adaptFrom; + + MediaType mediaType = webRequest.getHeader("Accept") == null ? null + : MediaType.parseMediaTypes(webRequest.getHeader("Accept")).iterator() + .next(); + delegate.handleReturnValue(getEmitter(1000L, flux, mediaType), + returnType, mavContainer, webRequest); + } + + private ResponseBodyEmitter getEmitter(Long timeout, Flux flux, MediaType mediaType) { + if (EVENT_STREAM.isCompatibleWith(mediaType)) { + return new FluxResponseSseEmitter<>(timeout, mediaType, flux); + } + return new FluxResponseBodyEmitter<>(timeout, mediaType, flux); + } + +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ReactorAutoConfiguration.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ReactorAutoConfiguration.java new file mode 100644 index 000000000..271c52a2f --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ReactorAutoConfiguration.java @@ -0,0 +1,64 @@ +/* + * Copyright 2013-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.flux; + +import java.util.List; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication; +import org.springframework.boot.autoconfigure.web.HttpMessageConverters; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler; +import org.springframework.web.method.support.HandlerMethodReturnValueHandler; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter; + +import reactor.core.publisher.Flux; + +/** + * @author Dave Syer + */ +@Configuration +@ConditionalOnWebApplication +@ConditionalOnClass({ Flux.class, AsyncHandlerMethodReturnValueHandler.class }) +public class ReactorAutoConfiguration extends WebMvcConfigurerAdapter { + + @Autowired + private FluxReturnValueHandler returnValueHandler; + + @Bean + public FluxReturnValueHandler fluxReturnValueHandler( + HttpMessageConverters converters) { + return new FluxReturnValueHandler(converters.getConverters()); + } + + @Configuration + protected static class MessageConverters { + @Bean + public HttpMessageConverters httpMessageConverters() { + return new HttpMessageConverters(new FluxHttpMessageConverter()); + } + } + + @Override + public void addReturnValueHandlers( + List returnValueHandlers) { + returnValueHandlers.add(returnValueHandler); + } + +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java new file mode 100644 index 000000000..f06130039 --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java @@ -0,0 +1,130 @@ +/* + * Copyright 2013-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.flux; + +import java.io.IOException; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import org.springframework.http.MediaType; +import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; + +import reactor.core.publisher.Flux; + +/** + * Subscriber that emits any value produced by the {@link Flux} into the delegated + * {@link ResponseBodyEmitter}. + * + * @author Dave Syer + */ +class ResponseBodyEmitterSubscriber implements Subscriber, Runnable { + + private final MediaType mediaType; + + private Subscription subscription; + + private final ResponseBodyEmitter responseBodyEmitter; + + private boolean completed; + + private boolean firstElementWritten; + + public ResponseBodyEmitterSubscriber(MediaType mediaType, Flux observable, + ResponseBodyEmitter responseBodyEmitter) { + + this.mediaType = mediaType; + this.responseBodyEmitter = responseBodyEmitter; + this.responseBodyEmitter.onTimeout(this); + this.responseBodyEmitter.onCompletion(this); + observable.subscribe(this); + } + + @Override + public void onSubscribe(Subscription subscription) { + if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { + try { + this.responseBodyEmitter.send("["); + } + catch (IOException e) { + // Urgh? + } + } + this.subscription = subscription; + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(T value) { + + Object object = value; + + try { + if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { + if (!this.firstElementWritten) { + this.firstElementWritten = true; + } + else { + responseBodyEmitter.send(","); + } + if (value.getClass()==String.class) { + object = "\"" + value + "\""; + } + } + if (!completed) { + responseBodyEmitter.send(object, mediaType); + } + } + catch ( + + IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + @Override + public void onError(Throwable e) { + responseBodyEmitter.completeWithError(e); + } + + @Override + public void onComplete() { + if (!completed) { + completed = true; + try { + if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { + if (!this.firstElementWritten) { + + this.firstElementWritten = true; + } + else { + responseBodyEmitter.send("]"); + } + } + } + catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + responseBodyEmitter.complete(); + } + } + + @Override + public void run() { + this.subscription.cancel(); + } +} diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java index 3165b417f..34475a1d0 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java @@ -47,17 +47,17 @@ import reactor.core.publisher.Flux; @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) public class RestApplicationTests { + private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream"); @LocalServerPort private int port; private TestRestTemplate rest = new TestRestTemplate(); @Test public void wordsSSE() throws Exception { - assertThat( - rest.exchange( - RequestEntity.get(new URI("http://localhost:" + port + "/words")) - .accept(MediaType.TEXT_EVENT_STREAM).build(), - String.class).getBody()).isEqualTo(sse("foo", "bar")); + assertThat(rest.exchange( + RequestEntity.get(new URI("http://localhost:" + port + "/words")) + .accept(EVENT_STREAM).build(), + String.class).getBody()).isEqualTo(sse("foo", "bar")); } @Test @@ -100,17 +100,17 @@ public class RestApplicationTests { .exchange( RequestEntity.post(new URI("http://localhost:" + port + "/maps")) .contentType(MediaType.APPLICATION_JSON) - .body("{\"value\":\"foo\"}{\"value\":\"bar\"}"), + // TODO: make this work without newline separator + .body("{\"value\":\"foo\"}\n{\"value\":\"bar\"}"), String.class) .getBody()).isEqualTo("{\"value\":\"FOO\"}{\"value\":\"BAR\"}"); } @Test public void uppercaseSSE() throws Exception { - assertThat(rest.exchange( - RequestEntity.post(new URI("http://localhost:" + port + "/uppercase")) - .accept(MediaType.TEXT_EVENT_STREAM) - .contentType(MediaType.TEXT_EVENT_STREAM).body(sse("foo", "bar")), + assertThat(rest.exchange(RequestEntity + .post(new URI("http://localhost:" + port + "/uppercase")) + .accept(EVENT_STREAM).contentType(EVENT_STREAM).body(sse("foo", "bar")), String.class).getBody()).isEqualTo(sse("[FOO]", "[BAR]")); } @@ -123,7 +123,7 @@ public class RestApplicationTests { @Bean public Function, Flux> uppercase() { - return flux -> flux.map(value -> "[" + value.trim().toUpperCase() + "]"); + return flux -> flux.log().map(value -> "[" + value.trim().toUpperCase() + "]"); } @Bean diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverterTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverterTests.java new file mode 100644 index 000000000..ee936f543 --- /dev/null +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/FluxHttpMessageConverterTests.java @@ -0,0 +1,108 @@ +/* + * Copyright 2012-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.flux; + +import org.junit.Test; + +import org.springframework.http.MediaType; +import org.springframework.mock.http.MockHttpInputMessage; + +import static org.assertj.core.api.Assertions.assertThat; + +import reactor.core.publisher.Flux; + +/** + * @author Dave Syer + * + */ +public class FluxHttpMessageConverterTests { + + private FluxHttpMessageConverter converter = new FluxHttpMessageConverter(); + + private Class> type = null; + + @Test + public void newlines() throws Exception { + MockHttpInputMessage message = new MockHttpInputMessage("foo\nbar".getBytes()); + assertThat(converter.read(type, message).collectList().block()).contains("foo", + "bar"); + } + + @Test + public void sse() throws Exception { + MockHttpInputMessage message = new MockHttpInputMessage( + "data:foo\n\ndata:bar".getBytes()); + message.getHeaders().setContentType(MediaType.valueOf("text/event-stream")); + assertThat(converter.read(type, message).collectList().block()).contains("foo", + "bar"); + } + + @Test + public void jsonStream() throws Exception { + MockHttpInputMessage message = new MockHttpInputMessage( + "{\"value\":\"foo\"}{\"value\":\"barrier\"}".getBytes()); + message.getHeaders().setContentType(MediaType.APPLICATION_JSON); + assertThat(converter.read(type, message).collectList().block()) + .contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}"); + } + + @Test + public void jsonStreamWhitespace() throws Exception { + MockHttpInputMessage message = new MockHttpInputMessage( + "{\"value\":\"foo\"} {\"value\":\"barrier\"} ".getBytes()); + message.getHeaders().setContentType(MediaType.APPLICATION_JSON); + assertThat(converter.read(type, message).collectList().block()) + .contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}"); + } + + @Test + public void jsonStreamNewline() throws Exception { + MockHttpInputMessage message = new MockHttpInputMessage( + "{\"value\":\"foo\"}\n{\"value\":\"barrier\"}".getBytes()); + message.getHeaders().setContentType(MediaType.APPLICATION_JSON); + assertThat(converter.read(type, message).collectList().block()) + .contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}"); + } + + @Test + public void jsonArray() throws Exception { + MockHttpInputMessage message = new MockHttpInputMessage( + "[{\"value\":\"foo\"},{\"value\":\"barrier\"}]".getBytes()); + message.getHeaders().setContentType(MediaType.APPLICATION_JSON); + assertThat(converter.read(type, message).collectList().block()) + .contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}"); + } + + @Test + public void jsonArrayWhitespace() throws Exception { + MockHttpInputMessage message = new MockHttpInputMessage( + "[{\"value\":\"foo\"}, {\"value\":\"barrier\"}] ".getBytes()); + message.getHeaders().setContentType(MediaType.APPLICATION_JSON); + assertThat(converter.read(type, message).collectList().block()) + .contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}"); + } + + @Test + public void jsonArrayNewline() throws Exception { + MockHttpInputMessage message = new MockHttpInputMessage( + "[{\"value\":\"foo\"},\n{\"value\":\"barrier\"}]".getBytes()); + message.getHeaders().setContentType(MediaType.APPLICATION_JSON); + assertThat(converter.read(type, message).collectList().block()) + .contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}"); + } + +}