Add MVC body processors to handle Flux

We don't need to cover all the possible uses of Flux (only
Flux<String> really), so this isn't comprehensive coverage of
all the features in Spring WebFlux, but it's good enough for
functions to run with Spring Boot 1.5.
This commit is contained in:
Dave Syer
2017-01-11 10:39:43 -05:00
committed by markfisher
parent 0fb31d6d2b
commit 216e5c9207
15 changed files with 820 additions and 384 deletions

View File

@@ -20,7 +20,6 @@ import java.util.function.Function;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.context.embedded.ReactiveServerProperties;
import org.springframework.cloud.function.registry.FunctionCatalog;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
@@ -35,7 +34,7 @@ import reactor.core.publisher.Flux;
*
*/
@RestController
@ConditionalOnClass({ RestController.class, ReactiveServerProperties.class })
@ConditionalOnClass(RestController.class)
public class FunctionController {
@Value("${debug:${DEBUG:false}}")

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,311 +16,16 @@
package org.springframework.cloud.function.web;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.commons.io.Charsets;
import org.reactivestreams.Publisher;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.AbstractDecoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.codec.DecoderHttpMessageReader;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.ServerSentEventHttpMessageReader;
import org.springframework.util.MimeType;
import org.springframework.util.StreamUtils;
import org.springframework.web.reactive.config.WebReactiveConfigurer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import reactor.core.publisher.Flux;
/**
* @author Mark Fisher
*/
@SpringBootApplication
public class RestApplication implements WebReactiveConfigurer {
@Override
public void extendMessageReaders(List<HttpMessageReader<?>> readers) {
readers.add(0, new ServerSentEventHttpMessageReader());
// Instead of the default JSON decoder we want to keep the Strings unparsed
readers.add(1, new DecoderHttpMessageReader<>(new JsonObjectDecoder()));
}
public class RestApplication {
public static void main(String[] args) {
SpringApplication.run(RestApplication.class, args);
}
}
/**
* Decode an arbitrary split byte stream representing JSON objects to a byte stream where
* each chunk is a well-formed JSON object.
*
* <p>
* This class does not do any real parsing or validation. A sequence of bytes is
* considered a JSON object/array if it contains a matching number of opening and closing
* braces/brackets.
*
* <p>
* Based on <a href=
* "https://github.com/netty/netty/blob/master/codec/src/main/java/io/netty/handler/codec/json/JsonObjectDecoder.java">Netty
* JsonObjectDecoder</a>
*
* @author Sebastien Deleuze
*
*/
// Copied from spring-web
class JsonObjectDecoder extends AbstractDecoder<String> {
private static final int ST_CORRUPTED = -1;
private static final int ST_INIT = 0;
private static final int ST_DECODING_NORMAL = 1;
private static final int ST_DECODING_ARRAY_STREAM = 2;
private final int maxObjectLength;
private final boolean streamArrayElements;
public JsonObjectDecoder() {
// 1 MB
this(1024 * 1024);
}
public JsonObjectDecoder(int maxObjectLength) {
this(maxObjectLength, true);
}
public JsonObjectDecoder(boolean streamArrayElements) {
this(1024 * 1024, streamArrayElements);
}
/**
* @param maxObjectLength maximum number of bytes a JSON object/array may use
* (including braces and all). Objects exceeding this length are dropped and an
* {@link IllegalStateException} is thrown.
* @param streamArrayElements if set to true and the "top level" JSON object is an
* array, each of its entries is passed through the pipeline individually and
* immediately after it was fully received, allowing for arrays with
*/
public JsonObjectDecoder(int maxObjectLength, boolean streamArrayElements) {
super(new MimeType("application", "json", StandardCharsets.UTF_8),
new MimeType("application", "*+json", StandardCharsets.UTF_8));
if (maxObjectLength < 1) {
throw new IllegalArgumentException("maxObjectLength must be a positive int");
}
this.maxObjectLength = maxObjectLength;
this.streamArrayElements = streamArrayElements;
}
@Override
public Flux<String> decode(Publisher<DataBuffer> inputStream,
ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) {
return Flux.from(inputStream)
.flatMap(new Function<DataBuffer, Publisher<String>>() {
int openBraces;
int index;
int state;
boolean insideString;
ByteBuf input;
Integer writerIndex;
@Override
public Publisher<String> apply(DataBuffer buffer) {
List<DataBuffer> chunks = new ArrayList<>();
if (this.input == null) {
this.input = Unpooled.copiedBuffer(buffer.asByteBuffer());
DataBufferUtils.release(buffer);
this.writerIndex = this.input.writerIndex();
}
else {
this.index = this.index - this.input.readerIndex();
this.input = Unpooled.copiedBuffer(this.input,
Unpooled.copiedBuffer(buffer.asByteBuffer()));
DataBufferUtils.release(buffer);
this.writerIndex = this.input.writerIndex();
}
if (this.state == ST_CORRUPTED) {
this.input.skipBytes(this.input.readableBytes());
return Flux
.error(new IllegalStateException("Corrupted stream"));
}
if (this.writerIndex > maxObjectLength) {
// buffer size exceeded maxObjectLength; discarding the
// complete buffer.
this.input.skipBytes(this.input.readableBytes());
reset();
return Flux.error(new IllegalStateException(
"object length exceeds " + maxObjectLength + ": "
+ this.writerIndex + " bytes discarded"));
}
DataBufferFactory dataBufferFactory = buffer.factory();
for (/* use current index */; this.index < this.writerIndex; this.index++) {
byte c = this.input.getByte(this.index);
if (this.state == ST_DECODING_NORMAL) {
decodeByte(c, this.input, this.index);
// All opening braces/brackets have been closed. That's
// enough to conclude
// that the JSON object/array is complete.
if (this.openBraces == 0) {
ByteBuf json = extractObject(this.input,
this.input.readerIndex(),
this.index + 1 - this.input.readerIndex());
if (json != null) {
chunks.add(
dataBufferFactory.wrap(json.nioBuffer()));
}
// The JSON object/array was extracted => discard the
// bytes from
// the input buffer.
this.input.readerIndex(this.index + 1);
// Reset the object state to get ready for the next
// JSON object/text
// coming along the byte stream.
reset();
}
}
else if (this.state == ST_DECODING_ARRAY_STREAM) {
decodeByte(c, this.input, this.index);
if (!this.insideString
&& (this.openBraces == 1 && c == ','
|| this.openBraces == 0 && c == ']')) {
// skip leading spaces. No range check is needed and
// the loop will terminate
// because the byte at position index is not a
// whitespace.
for (int i = this.input.readerIndex(); Character
.isWhitespace(this.input.getByte(i)); i++) {
this.input.skipBytes(1);
}
// skip trailing spaces.
int idxNoSpaces = this.index - 1;
while (idxNoSpaces >= this.input.readerIndex()
&& Character.isWhitespace(
this.input.getByte(idxNoSpaces))) {
idxNoSpaces--;
}
ByteBuf json = extractObject(this.input,
this.input.readerIndex(),
idxNoSpaces + 1 - this.input.readerIndex());
if (json != null) {
chunks.add(
dataBufferFactory.wrap(json.nioBuffer()));
}
this.input.readerIndex(this.index + 1);
if (c == ']') {
reset();
}
}
// JSON object/array detected. Accumulate bytes until all
// braces/brackets are closed.
}
else if (c == '{' || c == '[') {
initDecoding(c, streamArrayElements);
if (this.state == ST_DECODING_ARRAY_STREAM) {
// Discard the array bracket
this.input.skipBytes(1);
}
// Discard leading spaces in front of a JSON object/array.
}
else if (Character.isWhitespace(c)) {
this.input.skipBytes(1);
}
else {
this.state = ST_CORRUPTED;
return Flux.error(new IllegalStateException(
"invalid JSON received at byte position "
+ this.index + ": "
+ ByteBufUtil.hexDump(this.input)));
}
}
return Flux.fromIterable(chunks).map(buf -> {
try {
return StreamUtils.copyToString(buf.asInputStream(),
Charsets.UTF_8);
}
catch (IOException e) {
throw new IllegalStateException(
"Cannot convert buffer to String" + buf, e);
}
});
}
/**
* Override this method if you want to filter the json objects/arrays
* that get passed through the pipeline.
*/
protected ByteBuf extractObject(ByteBuf buffer, int index,
int length) {
return buffer.slice(index, length).retain();
}
private void decodeByte(byte c, ByteBuf input, int index) {
if ((c == '{' || c == '[') && !this.insideString) {
this.openBraces++;
}
else if ((c == '}' || c == ']') && !this.insideString) {
this.openBraces--;
}
else if (c == '"') {
// start of a new JSON string. It's necessary to detect
// strings as they may
// also contain braces/brackets and that could lead to
// incorrect results.
if (!this.insideString) {
this.insideString = true;
// If the double quote wasn't escaped then this is the end
// of a string.
}
else if (input.getByte(index - 1) != '\\') {
this.insideString = false;
}
}
}
private void initDecoding(byte openingBrace,
boolean streamArrayElements) {
this.openBraces = 1;
if (openingBrace == '[' && streamArrayElements) {
this.state = ST_DECODING_ARRAY_STREAM;
}
else {
this.state = ST_DECODING_NORMAL;
}
}
private void reset() {
this.insideString = false;
this.state = ST_INIT;
this.openBraces = 0;
}
});
}
}

View File

@@ -0,0 +1,323 @@
/*
* Copyright 2012-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.springframework.http.HttpInputMessage;
import org.springframework.http.HttpOutputMessage;
import org.springframework.http.MediaType;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.HttpMessageNotReadableException;
import org.springframework.http.converter.HttpMessageNotWritableException;
import reactor.core.publisher.Flux;
/**
* Converter for request bodies of type <code>Flux<String></code>.
*
* @author Dave Syer
*
*/
public class FluxHttpMessageConverter implements HttpMessageConverter<Flux<Object>> {
private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream");
@Override
public boolean canRead(Class<?> clazz, MediaType mediaType) {
return Flux.class.isAssignableFrom(clazz);
}
@Override
public boolean canWrite(Class<?> clazz, MediaType mediaType) {
return false;
}
@Override
public List<MediaType> getSupportedMediaTypes() {
return Arrays.asList(MediaType.ALL);
}
@Override
public Flux<Object> read(Class<? extends Flux<Object>> clazz,
HttpInputMessage inputMessage)
throws IOException, HttpMessageNotReadableException {
MediaType mediaType = inputMessage.getHeaders().getContentType();
if (mediaType != null) {
if (mediaType.includes(MediaType.APPLICATION_JSON)) {
return new JsonObjectDecoder().decode(inputMessage.getBody());
}
if (mediaType.includes(EVENT_STREAM)) {
return splitOnSseData(inputMessage);
}
}
return splitOnLineEndings(inputMessage);
}
private Flux<Object> splitOnLineEndings(HttpInputMessage inputMessage) {
return Flux.create(sink -> {
BufferedReader reader;
try {
reader = new BufferedReader(
new InputStreamReader(inputMessage.getBody()));
String line = reader.readLine();
while (line != null) {
sink.next(line);
line = reader.readLine();
}
}
catch (IOException e) {
sink.error(e);
}
sink.complete();
});
}
private Flux<Object> splitOnSseData(HttpInputMessage inputMessage) {
return Flux.create(sink -> {
BufferedReader reader;
StringBuffer buffer = new StringBuffer();
int emptyCount = 0;
try {
reader = new BufferedReader(
new InputStreamReader(inputMessage.getBody()));
String line = reader.readLine();
while (line != null) {
if (line.length() == 0) {
emptyCount++;
}
else {
if (buffer.length() == 0) {
if (line.startsWith("data:")) {
line = line.length() > "data:".length()
? line.substring("data:".length()) : "";
}
}
else {
buffer.append("\n");
}
buffer.append(line);
}
if (emptyCount > 0) {
sink.next(buffer.toString());
buffer.setLength(0);
emptyCount = 0;
while (line != null && line.length() == 0) {
line = reader.readLine();
}
}
else {
line = reader.readLine();
}
}
if (buffer.length()>0) {
sink.next(buffer.toString());
}
}
catch (IOException e) {
sink.error(e);
}
sink.complete();
});
}
@Override
public void write(Flux<Object> t, MediaType contentType,
HttpOutputMessage outputMessage)
throws IOException, HttpMessageNotWritableException {
}
static class JsonObjectDecoder {
private static final int ST_CORRUPTED = -1;
private static final int ST_INIT = 0;
private static final int ST_DECODING_NORMAL = 1;
private static final int ST_DECODING_ARRAY_STREAM = 2;
private final int maxObjectLength = 1024 * 1024;
private int openBraces;
private int state;
private boolean insideString;
private int writerIndex;
private boolean streamArrayElements = true;
public Flux<Object> decode(InputStream body) {
InputStreamReader reader = new InputStreamReader(body);
char[] buffer = new char[1024];
try {
List<String> chunks = new ArrayList<>();
int read = reader.read(buffer);
this.writerIndex += read;
while (read >= 0) {
if (this.state == ST_CORRUPTED) {
return Flux.error(new IllegalStateException("Corrupted stream"));
}
if (this.writerIndex > maxObjectLength) {
// buffer size exceeded maxObjectLength; discarding the complete
// buffer.
reset();
return Flux.error(new IllegalStateException(
"object length exceeds " + maxObjectLength + ": "
+ this.writerIndex + " bytes discarded"));
}
int point = 0;
for (int index = 0; index < read; index++) {
char c = buffer[index];
if (this.state == ST_DECODING_NORMAL) {
decodeByte(c, buffer, index);
// All opening braces/brackets have been closed. That's enough
// to conclude that the JSON object/array is complete.
if (this.openBraces == 0) {
char[] json = extractObject(buffer, point,
index + 1 - point);
if (json != null) {
chunks.add(new String(json));
}
// The JSON object/array was extracted => discard the
// bytes from the input buffer.
point += index + 1 - point;
// Reset the object state to get ready for the next JSON
// object/text coming along the byte stream.
reset();
}
}
else if (this.state == ST_DECODING_ARRAY_STREAM) {
decodeByte(c, buffer, index);
if (!this.insideString && (this.openBraces == 1 && c == ','
|| this.openBraces == 0 && c == ']')) {
// skip leading spaces. No range check is needed and the
// loop will terminate because the byte at position index
// is not a whitespace.
for (int i = point; Character
.isWhitespace(buffer[i]); i++) {
point++;
}
// skip trailing spaces.
int idxNoSpaces = index - 1;
while (idxNoSpaces >= 0
&& Character.isWhitespace(buffer[idxNoSpaces])) {
idxNoSpaces--;
}
char[] json = extractObject(buffer, point,
idxNoSpaces + 1 - point);
if (json != null) {
chunks.add(new String(json));
}
point += index + 1 - point;
if (c == ']') {
reset();
}
}
// JSON object/array detected. Accumulate bytes until all
// braces/brackets are closed.
}
else if (c == '{' || c == '[') {
initDecoding(c, this.streamArrayElements);
if (this.state == ST_DECODING_ARRAY_STREAM) {
// Discard the array bracket
point++;
}
// Discard leading spaces in front of a JSON object/array.
}
else if (Character.isWhitespace(c)) {
point++;
}
else {
this.state = ST_CORRUPTED;
return Flux.error(new IllegalStateException(
"invalid JSON received at byte position "
+ writerIndex));
}
}
read = reader.read(buffer);
}
return Flux.fromIterable(chunks);
}
catch (IOException e) {
return Flux.error(new IllegalStateException("Cannot read stream", e));
}
}
private char[] extractObject(char[] buffer, int index, int length) {
if (length <= 0) {
return null;
}
return Arrays.copyOfRange(buffer, index, index + length);
}
private void decodeByte(char c, char[] input, int index) {
if ((c == '{' || c == '[') && !this.insideString) {
this.openBraces++;
}
else if ((c == '}' || c == ']') && !this.insideString) {
this.openBraces--;
}
else if (c == '"') {
// start of a new JSON string. It's necessary to detect strings as they
// may also contain braces/brackets and that could lead to incorrect
// results.
if (!this.insideString) {
this.insideString = true;
// If the double quote wasn't escaped then this is the end of a
// string.
}
else if (input[index - 1] != '\\') {
this.insideString = false;
}
}
}
private void initDecoding(char openingBrace, boolean streamArrayElements) {
this.openBraces = 1;
if (openingBrace == '[' && streamArrayElements) {
this.state = ST_DECODING_ARRAY_STREAM;
}
else {
this.state = ST_DECODING_NORMAL;
}
}
private void reset() {
this.insideString = false;
this.state = ST_INIT;
this.openBraces = 0;
}
}
}

View File

@@ -0,0 +1,41 @@
/*
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import reactor.core.publisher.Flux;
/**
* A specialized {@link ResponseBodyEmitter} that handles {@link Flux} return types.
*
* @author Dave Syer
*/
class FluxResponseBodyEmitter<T> extends ResponseBodyEmitter {
public FluxResponseBodyEmitter(Flux<T> observable) {
this(null, null, observable);
}
public FluxResponseBodyEmitter(Long timeout, MediaType mediaType,
Flux<T> observable) {
super(timeout);
new ResponseBodyEmitterSubscriber<>(mediaType, observable, this);
}
}

View File

@@ -0,0 +1,42 @@
/*
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.publisher.Flux;
/**
* A specialized {@link ResponseBodyEmitter} that handles {@link Flux} return types with
* SSE streams.
*
* @author Dave Syer
*/
class FluxResponseSseEmitter<T> extends SseEmitter {
public FluxResponseSseEmitter(Flux<T> observable) {
this(null, MediaType.valueOf("text/event-stream"), observable);
}
public FluxResponseSseEmitter(Long timeout, MediaType mediaType, Flux<T> observable) {
super(timeout);
new ResponseBodyEmitterSubscriber<>(mediaType, observable, this);
}
}

View File

@@ -0,0 +1,93 @@
/*
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux;
import java.util.List;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler;
import org.springframework.web.method.support.ModelAndViewContainer;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler;
import reactor.core.publisher.Flux;
/**
* A specialized {@link AsyncHandlerMethodReturnValueHandler} that handles {@link Flux}
* return types.
*
* @author Dave Syer
*/
public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHandler {
private ResponseBodyEmitterReturnValueHandler delegate;
private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream");
public FluxReturnValueHandler(List<HttpMessageConverter<?>> messageConverters) {
delegate = new ResponseBodyEmitterReturnValueHandler(messageConverters);
}
@Override
public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) {
return returnValue != null && supportsReturnType(returnType);
}
@Override
public boolean supportsReturnType(MethodParameter returnType) {
return Flux.class.isAssignableFrom(returnType.getParameterType())
|| isResponseEntity(returnType);
}
private boolean isResponseEntity(MethodParameter returnType) {
if (ResponseEntity.class.isAssignableFrom(returnType.getParameterType())) {
Class<?> bodyType = ResolvableType.forMethodParameter(returnType)
.getGeneric(0).resolve();
return bodyType != null && Flux.class.isAssignableFrom(bodyType);
}
return false;
}
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest)
throws Exception {
Object adaptFrom = returnValue;
if (returnValue instanceof ResponseEntity) {
adaptFrom = ((ResponseEntity<?>) returnValue).getBody();
}
Flux<?> flux = (Flux<?>) adaptFrom;
MediaType mediaType = webRequest.getHeader("Accept") == null ? null
: MediaType.parseMediaTypes(webRequest.getHeader("Accept")).iterator()
.next();
delegate.handleReturnValue(getEmitter(1000L, flux, mediaType),
returnType, mavContainer, webRequest);
}
private ResponseBodyEmitter getEmitter(Long timeout, Flux<?> flux, MediaType mediaType) {
if (EVENT_STREAM.isCompatibleWith(mediaType)) {
return new FluxResponseSseEmitter<>(timeout, mediaType, flux);
}
return new FluxResponseBodyEmitter<>(timeout, mediaType, flux);
}
}

View File

@@ -0,0 +1,64 @@
/*
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.boot.autoconfigure.web.HttpMessageConverters;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler;
import org.springframework.web.method.support.HandlerMethodReturnValueHandler;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
*/
@Configuration
@ConditionalOnWebApplication
@ConditionalOnClass({ Flux.class, AsyncHandlerMethodReturnValueHandler.class })
public class ReactorAutoConfiguration extends WebMvcConfigurerAdapter {
@Autowired
private FluxReturnValueHandler returnValueHandler;
@Bean
public FluxReturnValueHandler fluxReturnValueHandler(
HttpMessageConverters converters) {
return new FluxReturnValueHandler(converters.getConverters());
}
@Configuration
protected static class MessageConverters {
@Bean
public HttpMessageConverters httpMessageConverters() {
return new HttpMessageConverters(new FluxHttpMessageConverter());
}
}
@Override
public void addReturnValueHandlers(
List<HandlerMethodReturnValueHandler> returnValueHandlers) {
returnValueHandlers.add(returnValueHandler);
}
}

View File

@@ -0,0 +1,130 @@
/*
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux;
import java.io.IOException;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import reactor.core.publisher.Flux;
/**
* Subscriber that emits any value produced by the {@link Flux} into the delegated
* {@link ResponseBodyEmitter}.
*
* @author Dave Syer
*/
class ResponseBodyEmitterSubscriber<T> implements Subscriber<T>, Runnable {
private final MediaType mediaType;
private Subscription subscription;
private final ResponseBodyEmitter responseBodyEmitter;
private boolean completed;
private boolean firstElementWritten;
public ResponseBodyEmitterSubscriber(MediaType mediaType, Flux<T> observable,
ResponseBodyEmitter responseBodyEmitter) {
this.mediaType = mediaType;
this.responseBodyEmitter = responseBodyEmitter;
this.responseBodyEmitter.onTimeout(this);
this.responseBodyEmitter.onCompletion(this);
observable.subscribe(this);
}
@Override
public void onSubscribe(Subscription subscription) {
if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
try {
this.responseBodyEmitter.send("[");
}
catch (IOException e) {
// Urgh?
}
}
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(T value) {
Object object = value;
try {
if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
if (!this.firstElementWritten) {
this.firstElementWritten = true;
}
else {
responseBodyEmitter.send(",");
}
if (value.getClass()==String.class) {
object = "\"" + value + "\"";
}
}
if (!completed) {
responseBodyEmitter.send(object, mediaType);
}
}
catch (
IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
@Override
public void onError(Throwable e) {
responseBodyEmitter.completeWithError(e);
}
@Override
public void onComplete() {
if (!completed) {
completed = true;
try {
if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
if (!this.firstElementWritten) {
this.firstElementWritten = true;
}
else {
responseBodyEmitter.send("]");
}
}
}
catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
responseBodyEmitter.complete();
}
}
@Override
public void run() {
this.subscription.cancel();
}
}

View File

@@ -47,17 +47,17 @@ import reactor.core.publisher.Flux;
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
public class RestApplicationTests {
private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream");
@LocalServerPort
private int port;
private TestRestTemplate rest = new TestRestTemplate();
@Test
public void wordsSSE() throws Exception {
assertThat(
rest.exchange(
RequestEntity.get(new URI("http://localhost:" + port + "/words"))
.accept(MediaType.TEXT_EVENT_STREAM).build(),
String.class).getBody()).isEqualTo(sse("foo", "bar"));
assertThat(rest.exchange(
RequestEntity.get(new URI("http://localhost:" + port + "/words"))
.accept(EVENT_STREAM).build(),
String.class).getBody()).isEqualTo(sse("foo", "bar"));
}
@Test
@@ -100,17 +100,17 @@ public class RestApplicationTests {
.exchange(
RequestEntity.post(new URI("http://localhost:" + port + "/maps"))
.contentType(MediaType.APPLICATION_JSON)
.body("{\"value\":\"foo\"}{\"value\":\"bar\"}"),
// TODO: make this work without newline separator
.body("{\"value\":\"foo\"}\n{\"value\":\"bar\"}"),
String.class)
.getBody()).isEqualTo("{\"value\":\"FOO\"}{\"value\":\"BAR\"}");
}
@Test
public void uppercaseSSE() throws Exception {
assertThat(rest.exchange(
RequestEntity.post(new URI("http://localhost:" + port + "/uppercase"))
.accept(MediaType.TEXT_EVENT_STREAM)
.contentType(MediaType.TEXT_EVENT_STREAM).body(sse("foo", "bar")),
assertThat(rest.exchange(RequestEntity
.post(new URI("http://localhost:" + port + "/uppercase"))
.accept(EVENT_STREAM).contentType(EVENT_STREAM).body(sse("foo", "bar")),
String.class).getBody()).isEqualTo(sse("[FOO]", "[BAR]"));
}
@@ -123,7 +123,7 @@ public class RestApplicationTests {
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return flux -> flux.map(value -> "[" + value.trim().toUpperCase() + "]");
return flux -> flux.log().map(value -> "[" + value.trim().toUpperCase() + "]");
}
@Bean

View File

@@ -0,0 +1,108 @@
/*
* Copyright 2012-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.flux;
import org.junit.Test;
import org.springframework.http.MediaType;
import org.springframework.mock.http.MockHttpInputMessage;
import static org.assertj.core.api.Assertions.assertThat;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
*
*/
public class FluxHttpMessageConverterTests {
private FluxHttpMessageConverter converter = new FluxHttpMessageConverter();
private Class<Flux<Object>> type = null;
@Test
public void newlines() throws Exception {
MockHttpInputMessage message = new MockHttpInputMessage("foo\nbar".getBytes());
assertThat(converter.read(type, message).collectList().block()).contains("foo",
"bar");
}
@Test
public void sse() throws Exception {
MockHttpInputMessage message = new MockHttpInputMessage(
"data:foo\n\ndata:bar".getBytes());
message.getHeaders().setContentType(MediaType.valueOf("text/event-stream"));
assertThat(converter.read(type, message).collectList().block()).contains("foo",
"bar");
}
@Test
public void jsonStream() throws Exception {
MockHttpInputMessage message = new MockHttpInputMessage(
"{\"value\":\"foo\"}{\"value\":\"barrier\"}".getBytes());
message.getHeaders().setContentType(MediaType.APPLICATION_JSON);
assertThat(converter.read(type, message).collectList().block())
.contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}");
}
@Test
public void jsonStreamWhitespace() throws Exception {
MockHttpInputMessage message = new MockHttpInputMessage(
"{\"value\":\"foo\"} {\"value\":\"barrier\"} ".getBytes());
message.getHeaders().setContentType(MediaType.APPLICATION_JSON);
assertThat(converter.read(type, message).collectList().block())
.contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}");
}
@Test
public void jsonStreamNewline() throws Exception {
MockHttpInputMessage message = new MockHttpInputMessage(
"{\"value\":\"foo\"}\n{\"value\":\"barrier\"}".getBytes());
message.getHeaders().setContentType(MediaType.APPLICATION_JSON);
assertThat(converter.read(type, message).collectList().block())
.contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}");
}
@Test
public void jsonArray() throws Exception {
MockHttpInputMessage message = new MockHttpInputMessage(
"[{\"value\":\"foo\"},{\"value\":\"barrier\"}]".getBytes());
message.getHeaders().setContentType(MediaType.APPLICATION_JSON);
assertThat(converter.read(type, message).collectList().block())
.contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}");
}
@Test
public void jsonArrayWhitespace() throws Exception {
MockHttpInputMessage message = new MockHttpInputMessage(
"[{\"value\":\"foo\"}, {\"value\":\"barrier\"}] ".getBytes());
message.getHeaders().setContentType(MediaType.APPLICATION_JSON);
assertThat(converter.read(type, message).collectList().block())
.contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}");
}
@Test
public void jsonArrayNewline() throws Exception {
MockHttpInputMessage message = new MockHttpInputMessage(
"[{\"value\":\"foo\"},\n{\"value\":\"barrier\"}]".getBytes());
message.getHeaders().setContentType(MediaType.APPLICATION_JSON);
assertThat(converter.read(type, message).collectList().block())
.contains("{\"value\":\"foo\"}", "{\"value\":\"barrier\"}");
}
}