diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/FlushingDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/FlushingDataBuffer.java deleted file mode 100644 index 74d7933542..0000000000 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/FlushingDataBuffer.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright 2002-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.core.io.buffer; - -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.function.IntPredicate; - -/** - * Empty {@link DataBuffer} that indicates to the file or the socket writing it - * that previously buffered data should be flushed. - * - * @author Sebastien Deleuze - * @since 5.0 - * @see FlushingDataBuffer#INSTANCE - */ -public class FlushingDataBuffer implements DataBuffer { - - /** Singleton instance of this class */ - public static final FlushingDataBuffer INSTANCE = new FlushingDataBuffer(); - - private final DataBuffer buffer; - - - private FlushingDataBuffer() { - this.buffer = new DefaultDataBufferFactory().allocateBuffer(0); - } - - - @Override - public DataBufferFactory factory() { - return this.buffer.factory(); - } - - @Override - public int indexOf(IntPredicate predicate, int fromIndex) { - return this.buffer.indexOf(predicate, fromIndex); - } - - @Override - public int lastIndexOf(IntPredicate predicate, int fromIndex) { - return this.buffer.lastIndexOf(predicate, fromIndex); - } - - @Override - public int readableByteCount() { - return this.buffer.readableByteCount(); - } - - @Override - public byte read() { - return this.buffer.read(); - } - - @Override - public DataBuffer read(byte[] destination) { - return this.buffer.read(destination); - } - - @Override - public DataBuffer read(byte[] destination, int offset, int length) { - return this.buffer.read(destination, offset, length); - } - - @Override - public DataBuffer write(byte b) { - return this.buffer.write(b); - } - - @Override - public DataBuffer write(byte[] source) { - return this.buffer.write(source); - } - - @Override - public DataBuffer write(byte[] source, int offset, int length) { - return this.buffer.write(source, offset, length); - } - - @Override - public DataBuffer write(DataBuffer... buffers) { - return this.buffer.write(buffers); - } - - @Override - public DataBuffer write(ByteBuffer... buffers) { - return this.buffer.write(buffers); - } - - @Override - public DataBuffer slice(int index, int length) { - return this.buffer.slice(index, length); - } - - @Override - public ByteBuffer asByteBuffer() { - return this.buffer.asByteBuffer(); - } - - @Override - public InputStream asInputStream() { - return this.buffer.asInputStream(); - } - - @Override - public OutputStream asOutputStream() { - return this.buffer.asOutputStream(); - } - -} diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java index 2a6a1d5886..e4bce79cb0 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java @@ -78,6 +78,24 @@ public class NettyDataBufferFactory implements DataBufferFactory { return new NettyDataBuffer(byteBuf, this); } + /** + * Returns the given Netty {@link DataBuffer} as a {@link ByteBuf}. Returns the + * {@linkplain NettyDataBuffer#getNativeBuffer() native buffer} if {@code buffer} is + * a {@link NettyDataBuffer}; returns {@link Unpooled#wrappedBuffer(ByteBuffer)} + * otherwise. + * @param buffer the {@code DataBuffer} to return a {@code ByteBuf} for. + * @return the netty {@code ByteBuf} + */ + public static ByteBuf toByteBuf(DataBuffer buffer) { + if (buffer instanceof NettyDataBuffer) { + return ((NettyDataBuffer) buffer).getNativeBuffer(); + } + else { + return Unpooled.wrappedBuffer(buffer.asByteBuffer()); + } + } + + @Override public String toString() { return "NettyDataBufferFactory (" + this.byteBufAllocator + ")"; diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/config/WebReactiveConfiguration.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/config/WebReactiveConfiguration.java index b06b6637df..6244967dc4 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/config/WebReactiveConfiguration.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/config/WebReactiveConfiguration.java @@ -41,7 +41,6 @@ import org.springframework.format.FormatterRegistry; import org.springframework.format.support.DefaultFormattingConversionService; import org.springframework.format.support.FormattingConversionService; import org.springframework.http.MediaType; -import org.springframework.http.codec.SseEventEncoder; import org.springframework.http.codec.json.JacksonJsonDecoder; import org.springframework.http.codec.json.JacksonJsonEncoder; import org.springframework.http.codec.xml.Jaxb2Decoder; @@ -51,6 +50,7 @@ import org.springframework.http.converter.reactive.EncoderHttpMessageWriter; import org.springframework.http.converter.reactive.HttpMessageReader; import org.springframework.http.converter.reactive.HttpMessageWriter; import org.springframework.http.converter.reactive.ResourceHttpMessageWriter; +import org.springframework.http.converter.reactive.SseEventHttpMessageWriter; import org.springframework.util.ClassUtils; import org.springframework.validation.Errors; import org.springframework.validation.Validator; @@ -391,7 +391,7 @@ public class WebReactiveConfiguration implements ApplicationContextAware { writers.add(new EncoderHttpMessageWriter<>(jacksonEncoder)); sseDataEncoders.add(jacksonEncoder); } - writers.add(new EncoderHttpMessageWriter<>(new SseEventEncoder(sseDataEncoders))); + writers.add(new SseEventHttpMessageWriter(sseDataEncoders)); } /** * Override this to modify the list of message writers after it has been diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java index dc0fd016c0..cc1bf2b27a 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java @@ -17,8 +17,6 @@ package org.springframework.web.reactive.result.method.annotation; import java.time.Duration; -import java.util.ArrayList; -import java.util.List; import org.junit.Before; import org.junit.Test; @@ -29,13 +27,9 @@ import reactor.test.TestSubscriber; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.core.codec.StringDecoder; import org.springframework.http.MediaType; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.http.codec.SseEvent; -import org.springframework.http.codec.json.JacksonJsonDecoder; -import org.springframework.http.converter.reactive.DecoderHttpMessageReader; -import org.springframework.http.converter.reactive.HttpMessageReader; import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests; import org.springframework.http.server.reactive.HttpHandler; import org.springframework.http.server.reactive.bootstrap.JettyHttpServer; diff --git a/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java b/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java index 41013728f3..8d12cb8297 100644 --- a/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java +++ b/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java @@ -23,7 +23,6 @@ import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.FlushingDataBuffer; /** * A "reactive" HTTP output message that accepts output as a {@link Publisher}. @@ -45,17 +44,23 @@ public interface ReactiveHttpOutputMessage extends HttpMessage { /** * Use the given {@link Publisher} to write the body of the message to the underlying - * HTTP layer, and flush the data when the complete signal is received (data could be - * flushed before depending on the configuration, the HTTP engine and the amount of - * data sent). - * - *

Each {@link FlushingDataBuffer} element will trigger a flush. + * HTTP layer. * * @param body the body content publisher * @return a publisher that indicates completion or error. */ Mono writeWith(Publisher body); + /** + * Use the given {@link Publisher} of {@code Publishers} to write the body of the + * message to the underlying HTTP layer, flushing after each + * {@code Publisher}. + * + * @param body the body content publisher + * @return a publisher that indicates completion or error. + */ + Mono writeAndFlushWith(Publisher> body); + /** * Returns a {@link DataBufferFactory} that can be used for creating the body. * @return a buffer factory diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java index 039411970d..a1e8e38720 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java @@ -17,9 +17,9 @@ package org.springframework.http.client.reactive; import java.net.URI; +import java.util.Collection; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.cookie.DefaultCookie; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -28,7 +28,6 @@ import reactor.io.netty.http.HttpClientRequest; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.NettyDataBuffer; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpMethod; @@ -50,7 +49,8 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { private final NettyDataBufferFactory bufferFactory; - public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri, HttpClientRequest httpRequest) { + public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri, + HttpClientRequest httpRequest) { this.httpMethod = httpMethod; this.uri = uri; this.httpRequest = httpRequest; @@ -75,8 +75,21 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { @Override public Mono writeWith(Publisher body) { - return applyBeforeCommit() - .then(httpRequest.send(Flux.from(body).map(this::toByteBuf))); + return applyBeforeCommit().then(this.httpRequest + .send(Flux.from(body).map(NettyDataBufferFactory::toByteBuf))); + } + + @Override + public Mono writeAndFlushWith(Publisher> body) { + Publisher> byteBufs = Flux.from(body). + map(ReactorClientHttpRequest::toByteBufs); + return applyBeforeCommit().then(this.httpRequest + .sendAndFlush(byteBufs)); + } + + private static Publisher toByteBufs(Publisher dataBuffers) { + return Flux.from(dataBuffers). + map(NettyDataBufferFactory::toByteBuf); } @Override @@ -84,27 +97,17 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { return applyBeforeCommit().then(httpRequest.sendHeaders()); } - private ByteBuf toByteBuf(DataBuffer buffer) { - if (buffer instanceof NettyDataBuffer) { - return ((NettyDataBuffer) buffer).getNativeBuffer(); - } - else { - return Unpooled.wrappedBuffer(buffer.asByteBuffer()); - } - } - @Override protected void writeHeaders() { - getHeaders().entrySet().stream() + getHeaders().entrySet() .forEach(e -> this.httpRequest.headers().set(e.getKey(), e.getValue())); } @Override protected void writeCookies() { - getCookies().values() - .stream().flatMap(cookies -> cookies.stream()) + getCookies().values().stream().flatMap(Collection::stream) .map(cookie -> new DefaultCookie(cookie.getName(), cookie.getValue())) - .forEach(cookie -> this.httpRequest.addCookie(cookie)); + .forEach(this.httpRequest::addCookie); } } \ No newline at end of file diff --git a/spring-web/src/main/java/org/springframework/http/codec/SseEvent.java b/spring-web/src/main/java/org/springframework/http/codec/SseEvent.java index 610d639b53..90dc5a23f6 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/SseEvent.java +++ b/spring-web/src/main/java/org/springframework/http/codec/SseEvent.java @@ -17,7 +17,7 @@ package org.springframework.http.codec; import org.springframework.http.MediaType; -import org.springframework.http.codec.SseEventEncoder; +import org.springframework.http.converter.reactive.SseEventHttpMessageWriter; /** * Representation for a Server-Sent Event for use with Spring's reactive Web @@ -26,7 +26,7 @@ import org.springframework.http.codec.SseEventEncoder; * * @author Sebastien Deleuze * @since 5.0 - * @see SseEventEncoder + * @see SseEventHttpMessageWriter * @see Server-Sent Events W3C recommendation */ public class SseEvent { @@ -96,10 +96,10 @@ public class SseEvent { /** * Set {@code data} SSE field. If a multiline {@code String} is provided, it will be - * turned into multiple {@code data} field lines by as - * defined in Server-Sent Events W3C recommendation. + * turned into multiple {@code data} field lines as defined in Server-Sent Events + * W3C recommendation. * - * If no {@code mediaType} is defined, default {@link SseEventEncoder} will: + * If no {@code mediaType} is defined, default {@link SseEventHttpMessageWriter} will: * - Turn single line {@code String} to a single {@code data} field * - Turn multiline line {@code String} to multiple {@code data} fields * - Serialize other {@code Object} as JSON @@ -119,7 +119,7 @@ public class SseEvent { /** * Set the {@link MediaType} used to serialize the {@code data}. - * {@link SseEventEncoder} should be configured with the relevant encoder to be + * {@link SseEventHttpMessageWriter} should be configured with the relevant encoder to be * able to serialize it. */ public void setMediaType(MediaType mediaType) { @@ -149,7 +149,7 @@ public class SseEvent { /** * Set SSE comment. If a multiline comment is provided, it will be turned into multiple - * SSE comment lines by {@link SseEventEncoder} as defined in Server-Sent Events W3C + * SSE comment lines by {@link SseEventHttpMessageWriter} as defined in Server-Sent Events W3C * recommendation. */ public void setComment(String comment) { diff --git a/spring-web/src/main/java/org/springframework/http/codec/SseEventEncoder.java b/spring-web/src/main/java/org/springframework/http/converter/reactive/SseEventHttpMessageWriter.java similarity index 61% rename from spring-web/src/main/java/org/springframework/http/codec/SseEventEncoder.java rename to spring-web/src/main/java/org/springframework/http/converter/reactive/SseEventHttpMessageWriter.java index 923a3d271d..b1458cc5a3 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/SseEventEncoder.java +++ b/spring-web/src/main/java/org/springframework/http/converter/reactive/SseEventHttpMessageWriter.java @@ -14,9 +14,10 @@ * limitations under the License. */ -package org.springframework.http.codec; +package org.springframework.http.converter.reactive; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.List; import java.util.Optional; @@ -25,15 +26,14 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; -import org.springframework.core.codec.AbstractEncoder; import org.springframework.core.codec.CodecException; import org.springframework.core.codec.Encoder; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.FlushingDataBuffer; import org.springframework.http.MediaType; +import org.springframework.http.ReactiveHttpOutputMessage; +import org.springframework.http.codec.SseEvent; import org.springframework.util.Assert; -import org.springframework.util.MimeType; /** * Encoder that supports a stream of {@link SseEvent}s and also plain @@ -42,25 +42,53 @@ import org.springframework.util.MimeType; * * @author Sebastien Deleuze * @since 5.0 + * @author Arjen Poutsma */ -public class SseEventEncoder extends AbstractEncoder { +public class SseEventHttpMessageWriter implements HttpMessageWriter { + + private static final MediaType TEXT_EVENT_STREAM = + new MediaType("text", "event-stream"); private final List> dataEncoders; - - public SseEventEncoder(List> dataEncoders) { - super(new MimeType("text", "event-stream")); + public SseEventHttpMessageWriter(List> dataEncoders) { Assert.notNull(dataEncoders, "'dataEncoders' must not be null"); this.dataEncoders = dataEncoders; } @Override - public Flux encode(Publisher inputStream, DataBufferFactory bufferFactory, - ResolvableType type, MimeType sseMimeType, Object... hints) { + public boolean canWrite(ResolvableType type, MediaType mediaType) { + return mediaType == null || TEXT_EVENT_STREAM.isCompatibleWith(mediaType); + } - return Flux.from(inputStream).flatMap(input -> { - SseEvent event = (SseEvent.class.equals(type.getRawClass()) ? - (SseEvent)input : new SseEvent(input)); + @Override + public List getWritableMediaTypes() { + return Collections.singletonList(TEXT_EVENT_STREAM); + } + + @Override + public Mono write(Publisher inputStream, ResolvableType type, + MediaType contentType, ReactiveHttpOutputMessage outputMessage) { + + outputMessage.getHeaders().setContentType(TEXT_EVENT_STREAM); + + DataBufferFactory bufferFactory = outputMessage.bufferFactory(); + Flux> body = encode(inputStream, bufferFactory, type); + + // Keep the SSE connection open even for cold stream in order to avoid + // unexpected browser reconnection + body = body.concatWith(Flux.never()); + + return outputMessage.writeAndFlushWith(body); + } + + private Flux> encode(Publisher inputStream, + DataBufferFactory bufferFactory, ResolvableType type) { + + return Flux.from(inputStream).map(input -> { + SseEvent event = + (SseEvent.class.equals(type.getRawClass()) ? (SseEvent) input : + new SseEvent(input)); StringBuilder sb = new StringBuilder(); @@ -90,26 +118,20 @@ public class SseEventEncoder extends AbstractEncoder { Object data = event.getData(); Flux dataBuffer = Flux.empty(); - MediaType mediaType = (event.getMediaType() == null ? MediaType.ALL : event.getMediaType()); + MediaType mediaType = + (event.getMediaType() == null ? MediaType.ALL : event.getMediaType()); if (data != null) { sb.append("data:"); if (data instanceof String) { - sb.append(((String)data).replaceAll("\\n", "\ndata:")).append("\n"); + sb.append(((String) data).replaceAll("\\n", "\ndata:")).append("\n"); } else { dataBuffer = applyEncoder(data, mediaType, bufferFactory); } } - // Keep the SSE connection open even for cold stream in order to avoid - // unexpected browser reconnection - return Flux.concat( - encodeString(sb.toString(), bufferFactory), - dataBuffer, - encodeString("\n", bufferFactory), - Mono.just(FlushingDataBuffer.INSTANCE), - Flux.never() - ); + return Flux.concat(encodeString(sb.toString(), bufferFactory), dataBuffer, + encodeString("\n", bufferFactory)); }); } @@ -121,10 +143,7 @@ public class SseEventEncoder extends AbstractEncoder { .stream() .filter(e -> e.canEncode(elementType, mediaType)) .findFirst(); - if (!encoder.isPresent()) { - return Flux.error(new CodecException("No suitable encoder found!")); - } - return ((Encoder) encoder.get()) + return ((Encoder) encoder.orElseThrow(() -> new CodecException("No suitable encoder found!"))) .encode(Mono.just((T) data), bufferFactory, elementType, mediaType) .concatWith(encodeString("\n", bufferFactory)); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java new file mode 100644 index 0000000000..3d58304385 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java @@ -0,0 +1,84 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.reactivestreams.Processor; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; + +/** + * Abstract base class for listener-based server responses, i.e. Servlet 3.1 and Undertow. + * @author Arjen Poutsma + * @since 5.0 + */ +public abstract class AbstractListenerServerHttpResponse extends AbstractServerHttpResponse { + + private final AtomicBoolean writeCalled = new AtomicBoolean(); + + public AbstractListenerServerHttpResponse(DataBufferFactory dataBufferFactory) { + super(dataBufferFactory); + } + + @Override + protected final Mono writeWithInternal(Publisher body) { + if (this.writeCalled.compareAndSet(false, true)) { + Processor bodyProcessor = createBodyProcessor(); + return Mono.from(subscriber -> { + body.subscribe(bodyProcessor); + bodyProcessor.subscribe(subscriber); + }); + + } else { + return Mono.error(new IllegalStateException( + "writeWith() or writeAndFlushWith() has already been called")); + } + } + + @Override + protected final Mono writeAndFlushWithInternal(Publisher> body) { + if (this.writeCalled.compareAndSet(false, true)) { + Processor, Void> bodyProcessor = + createBodyFlushProcessor(); + return Mono.from(subscriber -> { + body.subscribe(bodyProcessor); + bodyProcessor.subscribe(subscriber); + }); + } else { + return Mono.error(new IllegalStateException( + "writeWith() or writeAndFlushWith() has already been called")); + } + } + + /** + * Abstract template method to create a {@code Processor} that + * will write the response body to the underlying output. Called from + * {@link #writeWithInternal(Publisher)}. + */ + protected abstract Processor createBodyProcessor(); + + /** + * Abstract template method to create a {@code Processor, Void>} + * that will write the response body with flushes to the underlying output. Called from + * {@link #writeAndFlushWithInternal(Publisher)}. + */ + protected abstract Processor, Void> createBodyFlushProcessor(); +} diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java new file mode 100644 index 0000000000..ab2dc0a3de --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java @@ -0,0 +1,245 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Processor; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import org.springframework.core.io.buffer.DataBuffer; + +/** + * Abstract base class for {@code Processor} implementations that bridge between + * event-listener APIs and Reactive Streams. Specifically, base class for the + * Servlet 3.1 and Undertow support. + * + * @author Arjen Poutsma + * @since 5.0 + * @see ServletServerHttpRequest + * @see UndertowHttpHandlerAdapter + * @see ServerHttpResponse#writeAndFlushWith(Publisher) + */ +abstract class AbstractResponseBodyFlushProcessor + implements Processor, Void> { + + protected final Log logger = LogFactory.getLog(getClass()); + + private final ResponseBodyWriteResultPublisher publisherDelegate = + new ResponseBodyWriteResultPublisher(); + + private final AtomicReference state = + new AtomicReference<>(State.UNSUBSCRIBED); + + private volatile boolean subscriberCompleted; + + private Subscription subscription; + + // Subscriber + + @Override + public final void onSubscribe(Subscription subscription) { + if (logger.isTraceEnabled()) { + logger.trace(this.state + " onSubscribe: " + subscription); + } + this.state.get().onSubscribe(this, subscription); + } + + @Override + public final void onNext(Publisher publisher) { + if (logger.isTraceEnabled()) { + logger.trace(this.state + " onNext: " + publisher); + } + this.state.get().onNext(this, publisher); + } + + @Override + public final void onError(Throwable t) { + if (logger.isErrorEnabled()) { + logger.error(this.state + " onError: " + t, t); + } + this.state.get().onError(this, t); + } + + @Override + public final void onComplete() { + if (logger.isTraceEnabled()) { + logger.trace(this.state + " onComplete"); + } + this.state.get().onComplete(this); + } + + // Publisher + + @Override + public final void subscribe(Subscriber subscriber) { + this.publisherDelegate.subscribe(subscriber); + } + + /** + * Creates a new processor for subscribing to a body chunk. + */ + protected abstract Processor createBodyProcessor(); + + /** + * Flushes the output. + */ + protected abstract void flush() throws IOException; + + private void writeComplete() { + if (logger.isTraceEnabled()) { + logger.trace(this.state + " writeComplete"); + } + this.state.get().writeComplete(this); + + } + + private boolean changeState(State oldState, State newState) { + return this.state.compareAndSet(oldState, newState); + } + + private enum State { + UNSUBSCRIBED { + @Override + public void onSubscribe(AbstractResponseBodyFlushProcessor processor, + Subscription subscription) { + Objects.requireNonNull(subscription, "Subscription cannot be null"); + if (processor.changeState(this, SUBSCRIBED)) { + processor.subscription = subscription; + subscription.request(1); + } + else { + super.onSubscribe(processor, subscription); + } + } + }, SUBSCRIBED { + @Override + public void onNext(AbstractResponseBodyFlushProcessor processor, + Publisher chunk) { + Processor chunkProcessor = + processor.createBodyProcessor(); + chunk.subscribe(chunkProcessor); + chunkProcessor.subscribe(new WriteSubscriber(processor)); + } + + @Override + void onComplete(AbstractResponseBodyFlushProcessor processor) { + processor.subscriberCompleted = true; + } + + @Override + public void writeComplete(AbstractResponseBodyFlushProcessor processor) { + if (processor.subscriberCompleted) { + if (processor.changeState(this, COMPLETED)) { + processor.subscriberCompleted = true; + processor.publisherDelegate.publishComplete(); + } + } + else { + try { + processor.flush(); + } + catch (IOException ex) { + processor.onError(ex); + } + processor.subscription.request(1); + } + } + }, COMPLETED { + @Override + public void onNext(AbstractResponseBodyFlushProcessor processor, + Publisher publisher) { + // ignore + + } + + @Override + void onError(AbstractResponseBodyFlushProcessor processor, Throwable t) { + // ignore + } + + @Override + void onComplete(AbstractResponseBodyFlushProcessor processor) { + // ignore + } + + @Override + public void writeComplete(AbstractResponseBodyFlushProcessor processor) { + // ignore + } + }; + + public void onSubscribe(AbstractResponseBodyFlushProcessor processor, + Subscription subscription) { + subscription.cancel(); + } + + public void onNext(AbstractResponseBodyFlushProcessor processor, + Publisher publisher) { + throw new IllegalStateException(toString()); + } + + void onError(AbstractResponseBodyFlushProcessor processor, Throwable t) { + if (processor.changeState(this, COMPLETED)) { + processor.publisherDelegate.publishError(t); + } + } + + void onComplete(AbstractResponseBodyFlushProcessor processor) { + throw new IllegalStateException(toString()); + } + + public void writeComplete(AbstractResponseBodyFlushProcessor processor) { + throw new IllegalStateException(toString()); + } + + private static class WriteSubscriber implements Subscriber { + + private final AbstractResponseBodyFlushProcessor processor; + + public WriteSubscriber(AbstractResponseBodyFlushProcessor processor) { + this.processor = processor; + } + + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Void aVoid) { + } + + @Override + public void onError(Throwable t) { + processor.onError(t); + } + + @Override + public void onComplete() { + processor.writeComplete(); + } + } + } + +} diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java index 58e1aedf70..29268d1e17 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java @@ -25,16 +25,16 @@ import javax.servlet.WriteListener; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.reactivestreams.Processor; +import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.FlushingDataBuffer; import org.springframework.core.io.buffer.support.DataBufferUtils; import org.springframework.util.Assert; /** - * Abstract base class for {@code Subscriber} implementations that bridge between + * Abstract base class for {@code Processor} implementations that bridge between * event-listener APIs and Reactive Streams. Specifically, base class for the * Servlet 3.1 and Undertow support. * @@ -42,6 +42,7 @@ import org.springframework.util.Assert; * @since 5.0 * @see ServletServerHttpRequest * @see UndertowHttpHandlerAdapter + * @see ServerHttpResponse#writeWith(Publisher) */ abstract class AbstractResponseBodyProcessor implements Processor { @@ -158,11 +159,6 @@ abstract class AbstractResponseBodyProcessor implements Processor writeWith(Publisher publisher) { - return new ChannelSendOperator<>(publisher, writePublisher -> - applyBeforeCommit().then(() -> writeWithInternal(writePublisher))); + public final Mono writeWith(Publisher body) { + return new ChannelSendOperator<>(body, writePublisher -> applyBeforeCommit() + .then(() -> writeWithInternal(writePublisher))); + } + + @Override + public final Mono writeAndFlushWith(Publisher> body) { + return new ChannelSendOperator<>(body, writePublisher -> applyBeforeCommit() + .then(() -> writeAndFlushWithInternal(writePublisher))); } protected Mono applyBeforeCommit() { @@ -160,6 +166,14 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { */ protected abstract Mono writeWithInternal(Publisher body); + /** + * Implement this method to write to the underlying the response, and flush after + * each {@code Publisher}. + * @param body the publisher to write and flush with + */ + protected abstract Mono writeAndFlushWithInternal( + Publisher> body); + /** * Implement this method to write the status code to the underlying response. * This method is called once only. diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java index cb24f8fbbc..6a7194d581 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java @@ -19,7 +19,6 @@ package org.springframework.http.server.reactive; import java.io.File; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http.cookie.DefaultCookie; @@ -30,8 +29,7 @@ import reactor.io.netty.http.HttpChannel; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.FlushingDataBuffer; -import org.springframework.core.io.buffer.NettyDataBuffer; +import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; import org.springframework.http.ZeroCopyHttpOutputMessage; @@ -73,12 +71,16 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse @Override protected Mono writeWithInternal(Publisher publisher) { - return Flux.from(publisher) - .window() - .concatMap(w -> this.channel.send(w - .takeUntil(db -> db instanceof FlushingDataBuffer) - .map(this::toByteBuf))) - .then(); + Publisher body = toByteBufs(publisher); + return this.channel.send(body); + } + + @Override + protected Mono writeAndFlushWithInternal( + Publisher> publisher) { + Publisher> body = Flux.from(publisher). + map(ReactorServerHttpResponse::toByteBufs); + return this.channel.sendAndFlush(body); } @Override @@ -107,20 +109,16 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse } } - private ByteBuf toByteBuf(DataBuffer buffer) { - if (buffer instanceof NettyDataBuffer) { - return ((NettyDataBuffer) buffer).getNativeBuffer(); - } - else { - return Unpooled.wrappedBuffer(buffer.asByteBuffer()); - } - } - @Override public Mono writeWith(File file, long position, long count) { - return applyBeforeCommit().then(() -> { - return this.channel.sendFile(file, position, count); - }); + return applyBeforeCommit().then(() -> this.channel.sendFile(file, position, count)); } + private static Publisher toByteBufs(Publisher dataBuffers) { + return Flux.from(dataBuffers). + map(NettyDataBufferFactory::toByteBuf); + } + + + } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java index 9bcb988f8c..53f73da4b3 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java @@ -17,7 +17,6 @@ package org.springframework.http.server.reactive; import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.cookie.Cookie; @@ -26,12 +25,11 @@ import io.reactivex.netty.protocol.http.server.HttpServerResponse; import io.reactivex.netty.protocol.http.server.ResponseContentWriter; import org.reactivestreams.Publisher; import reactor.adapter.RxJava1Adapter; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import rx.Observable; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.FlushingDataBuffer; -import org.springframework.core.io.buffer.NettyDataBuffer; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; @@ -48,6 +46,7 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse { private final HttpServerResponse response; + private static final ByteBuf FLUSH_SIGNAL = Unpooled.buffer(0, 0); public RxNettyServerHttpResponse(HttpServerResponse response, NettyDataBufferFactory dataBufferFactory) { @@ -73,23 +72,32 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse { @Override protected Mono writeWithInternal(Publisher body) { - Observable content = RxJava1Adapter.publisherToObservable(body).map(this::toByteBuf); - ResponseContentWriter writer = this.response.write(content, bb -> bb instanceof FlushingByteBuf); - return RxJava1Adapter.observableToFlux(writer).then(); + Observable content = RxJava1Adapter.publisherToObservable(body) + .map(NettyDataBufferFactory::toByteBuf); + return RxJava1Adapter.observableToFlux(this.response.write(content)) + .then(); } - private ByteBuf toByteBuf(DataBuffer buffer) { - ByteBuf byteBuf = (buffer instanceof NettyDataBuffer ? - ((NettyDataBuffer) buffer).getNativeBuffer() : - Unpooled.wrappedBuffer(buffer.asByteBuffer())); - return (buffer instanceof FlushingDataBuffer ? new FlushingByteBuf(byteBuf) : byteBuf); + @Override + protected Mono writeAndFlushWithInternal( + Publisher> body) { + Flux bodyWithFlushSignals = Flux.from(body). + flatMap(publisher -> { + return Flux.from(publisher). + map(NettyDataBufferFactory::toByteBuf). + concatWith(Mono.just(FLUSH_SIGNAL)); + }); + Observable content = RxJava1Adapter.publisherToObservable(bodyWithFlushSignals); + ResponseContentWriter writer = this.response.write(content, bb -> bb == FLUSH_SIGNAL); + return RxJava1Adapter.observableToFlux(writer).then(); } @Override protected void writeHeaders() { for (String name : getHeaders().keySet()) { - for (String value : getHeaders().get(name)) + for (String value : getHeaders().get(name)) { this.response.addHeader(name, value); + } } } @@ -110,15 +118,8 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse { } } - private class FlushingByteBuf extends CompositeByteBuf { - public FlushingByteBuf(ByteBuf byteBuf) { - super(byteBuf.alloc(), byteBuf.isDirect(), 1); - this.addComponent(true, byteBuf); - } - } - -/* + /* While the underlying implementation of {@link ZeroCopyHttpOutputMessage} seems to work; it does bypass {@link #applyBeforeCommit} and more importantly it doesn't change its {@linkplain #state()). Therefore it's commented out, for now. diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index 1875f0057b..6ae0647e53 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -18,16 +18,17 @@ package org.springframework.http.server.reactive; import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; import java.nio.charset.Charset; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import javax.servlet.ServletOutputStream; import javax.servlet.WriteListener; import javax.servlet.http.Cookie; import javax.servlet.http.HttpServletResponse; -import org.reactivestreams.Publisher; -import reactor.core.publisher.Mono; +import org.reactivestreams.Processor; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; @@ -38,13 +39,12 @@ import org.springframework.util.Assert; /** * Adapt {@link ServerHttpResponse} to the Servlet {@link HttpServletResponse}. - * * @author Rossen Stoyanchev * @since 5.0 */ -public class ServletServerHttpResponse extends AbstractServerHttpResponse { +public class ServletServerHttpResponse extends AbstractListenerServerHttpResponse { - private final Object bodyProcessorMonitor = new Object(); + private final AtomicBoolean listenerRegistered = new AtomicBoolean(); private volatile ResponseBodyProcessor bodyProcessor; @@ -52,6 +52,8 @@ public class ServletServerHttpResponse extends AbstractServerHttpResponse { private final int bufferSize; + private volatile boolean flushOnNext; + public ServletServerHttpResponse(HttpServletResponse response, DataBufferFactory dataBufferFactory, int bufferSize) throws IOException { super(dataBufferFactory); @@ -75,38 +77,6 @@ public class ServletServerHttpResponse extends AbstractServerHttpResponse { } } - @Override - protected Mono writeWithInternal(Publisher publisher) { - Assert.state(this.bodyProcessor == null, - "Response body publisher is already provided"); - try { - synchronized (this.bodyProcessorMonitor) { - if (this.bodyProcessor == null) { - this.bodyProcessor = createBodyProcessor(); - } - else { - throw new IllegalStateException( - "Response body publisher is already provided"); - } - } - return Mono.from(subscriber -> { - publisher.subscribe(this.bodyProcessor); - this.bodyProcessor.subscribe(subscriber); - }); - } - catch (IOException ex) { - return Mono.error(ex); - } - } - - private ResponseBodyProcessor createBodyProcessor() throws IOException { - ResponseBodyProcessor bodyProcessor = - new ResponseBodyProcessor(this.response.getOutputStream(), - this.bufferSize); - bodyProcessor.registerListener(); - return bodyProcessor; - } - @Override protected void writeHeaders() { for (Map.Entry> entry : getHeaders().entrySet()) { @@ -142,26 +112,59 @@ public class ServletServerHttpResponse extends AbstractServerHttpResponse { } } - private static class ResponseBodyProcessor extends AbstractResponseBodyProcessor { + private void registerListener() throws IOException { + if (this.listenerRegistered.compareAndSet(false, true)) { + ResponseBodyWriteListener writeListener = new ResponseBodyWriteListener(); + this.response.getOutputStream().setWriteListener(writeListener); + } + } - private final ResponseBodyWriteListener writeListener = - new ResponseBodyWriteListener(); + private void flush() throws IOException { + ServletOutputStream outputStream = this.response.getOutputStream(); + if (outputStream.isReady()) { + try { + outputStream.flush(); + this.flushOnNext = false; + } + catch (IOException ex) { + this.flushOnNext = true; + throw ex; + } + } + else { + this.flushOnNext = true; + } + } + + @Override + protected ResponseBodyProcessor createBodyProcessor() { + try { + registerListener(); + this.bodyProcessor = new ResponseBodyProcessor(this.response.getOutputStream(), + this.bufferSize); + return this.bodyProcessor; + } + catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + + @Override + protected AbstractResponseBodyFlushProcessor createBodyFlushProcessor() { + return new ResponseBodyFlushProcessor(); + } + + private class ResponseBodyProcessor extends AbstractResponseBodyProcessor { private final ServletOutputStream outputStream; private final int bufferSize; - private volatile boolean flushOnNext; - public ResponseBodyProcessor(ServletOutputStream outputStream, int bufferSize) { this.outputStream = outputStream; this.bufferSize = bufferSize; } - public void registerListener() throws IOException { - this.outputStream.setWriteListener(this.writeListener); - } - @Override protected boolean isWritePossible() { return this.outputStream.isReady(); @@ -169,7 +172,10 @@ public class ServletServerHttpResponse extends AbstractServerHttpResponse { @Override protected boolean write(DataBuffer dataBuffer) throws IOException { - if (this.flushOnNext) { + if (ServletServerHttpResponse.this.flushOnNext) { + if (logger.isTraceEnabled()) { + logger.trace("flush"); + } flush(); } @@ -193,20 +199,6 @@ public class ServletServerHttpResponse extends AbstractServerHttpResponse { } } - @Override - protected void flush() throws IOException { - if (this.outputStream.isReady()) { - if (logger.isTraceEnabled()) { - logger.trace("flush"); - } - this.outputStream.flush(); - this.flushOnNext = false; - return; - } - this.flushOnNext = true; - - } - private int writeDataBuffer(DataBuffer dataBuffer) throws IOException { InputStream input = dataBuffer.asInputStream(); @@ -223,17 +215,40 @@ public class ServletServerHttpResponse extends AbstractServerHttpResponse { return bytesWritten; } - private class ResponseBodyWriteListener implements WriteListener { + } - @Override - public void onWritePossible() throws IOException { - ResponseBodyProcessor.this.onWritePossible(); + private class ResponseBodyWriteListener implements WriteListener { + + @Override + public void onWritePossible() throws IOException { + if (bodyProcessor != null) { + bodyProcessor.onWritePossible(); } + } - @Override - public void onError(Throwable ex) { - ResponseBodyProcessor.this.onError(ex); + @Override + public void onError(Throwable ex) { + if (bodyProcessor != null) { + bodyProcessor.onError(ex); } } } + + private class ResponseBodyFlushProcessor extends AbstractResponseBodyFlushProcessor { + + @Override + protected Processor createBodyProcessor() { + return ServletServerHttpResponse.this.createBodyProcessor(); + } + + @Override + protected void flush() throws IOException { + if (logger.isTraceEnabled()) { + logger.trace("flush"); + } + ServletServerHttpResponse.this.flush(); + } + + } + } \ No newline at end of file diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index 01a2487897..b4483aea48 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -28,7 +28,7 @@ import io.undertow.server.HttpServerExchange; import io.undertow.server.handlers.Cookie; import io.undertow.server.handlers.CookieImpl; import io.undertow.util.HttpString; -import org.reactivestreams.Publisher; +import org.reactivestreams.Processor; import org.xnio.ChannelListener; import org.xnio.channels.StreamSinkChannel; import reactor.core.publisher.Mono; @@ -48,15 +48,13 @@ import org.springframework.util.Assert; * @author Arjen Poutsma * @since 5.0 */ -public class UndertowServerHttpResponse extends AbstractServerHttpResponse +public class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse implements ZeroCopyHttpOutputMessage { - private final Object bodyProcessorMonitor = new Object(); - - private volatile ResponseBodyProcessor bodyProcessor; - private final HttpServerExchange exchange; + private StreamSinkChannel responseChannel; + public UndertowServerHttpResponse(HttpServerExchange exchange, DataBufferFactory dataBufferFactory) { super(dataBufferFactory); @@ -76,37 +74,6 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse } } - @Override - protected Mono writeWithInternal(Publisher publisher) { - Assert.state(this.bodyProcessor == null, - "Response body publisher is already provided"); - try { - synchronized (this.bodyProcessorMonitor) { - if (this.bodyProcessor == null) { - this.bodyProcessor = createBodyProcessor(); - } - else { - throw new IllegalStateException( - "Response body publisher is already provided"); - } - } - return Mono.from(subscriber -> { - publisher.subscribe(this.bodyProcessor); - this.bodyProcessor.subscribe(subscriber); - }); - } - catch (IOException ex) { - return Mono.error(ex); - } - } - - private ResponseBodyProcessor createBodyProcessor() throws IOException { - ResponseBodyProcessor bodyProcessor = new ResponseBodyProcessor(this.exchange); - bodyProcessor.registerListener(); - return bodyProcessor; - } - - @Override public Mono writeWith(File file, long position, long count) { writeHeaders(); @@ -156,6 +123,22 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse } } + @Override + protected ResponseBodyProcessor createBodyProcessor() { + if (this.responseChannel == null) { + this.responseChannel = this.exchange.getResponseChannel(); + } + ResponseBodyProcessor bodyProcessor = + new ResponseBodyProcessor( this.responseChannel); + bodyProcessor.registerListener(); + return bodyProcessor; + } + + @Override + protected AbstractResponseBodyFlushProcessor createBodyFlushProcessor() { + return new ResponseBodyFlushProcessor(); + } + private static class ResponseBodyProcessor extends AbstractResponseBodyProcessor { private final ChannelListener listener = new WriteListener(); @@ -164,8 +147,9 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse private volatile ByteBuffer byteBuffer; - public ResponseBodyProcessor(HttpServerExchange exchange) { - this.responseChannel = exchange.getResponseChannel(); + public ResponseBodyProcessor(StreamSinkChannel responseChannel) { + Assert.notNull(responseChannel, "'responseChannel' must not be null"); + this.responseChannel = responseChannel; } public void registerListener() { @@ -173,14 +157,6 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse this.responseChannel.resumeWrites(); } - @Override - protected void flush() throws IOException { - if (logger.isTraceEnabled()) { - logger.trace("flush"); - } - this.responseChannel.flush(); - } - @Override protected boolean write(DataBuffer dataBuffer) throws IOException { if (this.byteBuffer == null) { @@ -231,4 +207,23 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse } } + + private class ResponseBodyFlushProcessor extends AbstractResponseBodyFlushProcessor { + + @Override + protected Processor createBodyProcessor() { + return UndertowServerHttpResponse.this.createBodyProcessor(); + } + + @Override + protected void flush() throws IOException { + if (UndertowServerHttpResponse.this.responseChannel != null) { + if (logger.isTraceEnabled()) { + logger.trace("flush"); + } + UndertowServerHttpResponse.this.responseChannel.flush(); + } + } + + } } diff --git a/spring-web/src/test/java/org/springframework/http/codec/SseEventEncoderTests.java b/spring-web/src/test/java/org/springframework/http/codec/SseEventEncoderTests.java deleted file mode 100644 index 13442bc701..0000000000 --- a/spring-web/src/test/java/org/springframework/http/codec/SseEventEncoderTests.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright 2002-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.http.codec; - -import java.util.Arrays; - -import static org.junit.Assert.*; -import org.junit.Test; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.test.TestSubscriber; - -import org.springframework.core.ResolvableType; -import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase; -import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.FlushingDataBuffer; -import org.springframework.http.codec.json.JacksonJsonEncoder; -import org.springframework.util.MimeType; - -/** - * @author Sebastien Deleuze - */ -public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase { - - @Test - public void nullMimeType() { - SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder())); - assertTrue(encoder.canEncode(ResolvableType.forClass(Object.class), null)); - } - - @Test - public void unsupportedMimeType() { - SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder())); - assertFalse(encoder.canEncode(ResolvableType.forClass(Object.class), new MimeType("foo", "bar"))); - } - - @Test - public void supportedMimeType() { - SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder())); - assertTrue(encoder.canEncode(ResolvableType.forClass(Object.class), new MimeType("text", "event-stream"))); - } - - @Test - public void encodeServerSentEvent() { - SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder())); - SseEvent event = new SseEvent(); - event.setId("c42"); - event.setName("foo"); - event.setComment("bla\nbla bla\nbla bla bla"); - event.setReconnectTime(123L); - Mono source = Mono.just(event); - Flux output = encoder.encode(source, this.bufferFactory, - ResolvableType.forClass(SseEvent.class), new MimeType("text", "event-stream")); - TestSubscriber - .subscribe(output) - .assertNoError() - .assertValuesWith( - stringConsumer( - "id:c42\n" + - "event:foo\n" + - "retry:123\n" + - ":bla\n:bla bla\n:bla bla bla\n"), - stringConsumer("\n"), - b -> assertEquals(FlushingDataBuffer.class, b.getClass()) - ); - } - - @Test - public void encodeString() { - SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder())); - Flux source = Flux.just("foo", "bar"); - Flux output = encoder.encode(source, this.bufferFactory, - ResolvableType.forClass(String.class), new MimeType("text", "event-stream")); - TestSubscriber - .subscribe(output) - .assertNoError() - .assertValuesWith( - stringConsumer("data:foo\n"), - stringConsumer("\n"), - b -> assertEquals(FlushingDataBuffer.class, b.getClass()), - stringConsumer("data:bar\n"), - stringConsumer("\n"), - b -> assertEquals(FlushingDataBuffer.class, b.getClass()) - ); - } - - @Test - public void encodeMultilineString() { - SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder())); - Flux source = Flux.just("foo\nbar", "foo\nbaz"); - Flux output = encoder.encode(source, this.bufferFactory, - ResolvableType.forClass(String.class), new MimeType("text", "event-stream")); - TestSubscriber - .subscribe(output) - .assertNoError() - .assertValuesWith( - stringConsumer("data:foo\ndata:bar\n"), - stringConsumer("\n"), - b -> assertEquals(FlushingDataBuffer.class, b.getClass()), - stringConsumer("data:foo\ndata:baz\n"), - stringConsumer("\n"), - b -> assertEquals(FlushingDataBuffer.class, b.getClass()) - ); - } - - @Test - public void encodePojo() { - SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder())); - Flux source = Flux.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar")); - Flux output = encoder.encode(source, this.bufferFactory, - ResolvableType.forClass(Pojo.class), new MimeType("text", "event-stream")); - TestSubscriber - .subscribe(output) - .assertNoError() - .assertValuesWith( - stringConsumer("data:"), - stringConsumer("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}"), - stringConsumer("\n"), - stringConsumer("\n"), - b -> assertEquals(FlushingDataBuffer.class, b.getClass()), - stringConsumer("data:"), - stringConsumer("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}"), - stringConsumer("\n"), - stringConsumer("\n"), - b -> assertEquals(FlushingDataBuffer.class, b.getClass()) - ); - } - -} diff --git a/spring-web/src/test/java/org/springframework/http/converter/reactive/SseEventHttpMessageWriterTests.java b/spring-web/src/test/java/org/springframework/http/converter/reactive/SseEventHttpMessageWriterTests.java new file mode 100644 index 0000000000..329e3b42f8 --- /dev/null +++ b/spring-web/src/test/java/org/springframework/http/converter/reactive/SseEventHttpMessageWriterTests.java @@ -0,0 +1,160 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.converter.reactive; + +import java.util.Collections; + +import org.junit.Test; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.TestSubscriber; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.http.MediaType; +import org.springframework.http.codec.Pojo; +import org.springframework.http.codec.SseEvent; +import org.springframework.http.codec.json.JacksonJsonEncoder; +import org.springframework.http.server.reactive.MockServerHttpResponse; + +import static org.junit.Assert.*; + +/** + * @author Sebastien Deleuze + */ +public class SseEventHttpMessageWriterTests + extends AbstractDataBufferAllocatingTestCase { + + private SseEventHttpMessageWriter converter = new SseEventHttpMessageWriter( + Collections.singletonList(new JacksonJsonEncoder())); + + @Test + public void nullMimeType() { + assertTrue(converter.canWrite(ResolvableType.forClass(Object.class), null)); + } + + @Test + public void unsupportedMimeType() { + assertFalse(converter.canWrite(ResolvableType.forClass(Object.class), + new MediaType("foo", "bar"))); + } + + @Test + public void supportedMimeType() { + assertTrue(converter.canWrite(ResolvableType.forClass(Object.class), + new MediaType("text", "event-stream"))); + } + + @Test + public void encodeServerSentEvent() { + SseEvent event = new SseEvent(); + event.setId("c42"); + event.setName("foo"); + event.setComment("bla\nbla bla\nbla bla bla"); + event.setReconnectTime(123L); + Mono source = Mono.just(event); + MockServerHttpResponse outputMessage = new MockServerHttpResponse(); + converter.write(source, ResolvableType.forClass(SseEvent.class), + new MediaType("text", "event-stream"), outputMessage); + + Publisher> result = outputMessage.getBodyWithFlush(); + TestSubscriber.subscribe(result). + assertNoError(). + assertValuesWith(publisher -> { + TestSubscriber.subscribe(publisher).assertNoError().assertValuesWith( + stringConsumer("id:c42\n" + "event:foo\n" + "retry:123\n" + + ":bla\n:bla bla\n:bla bla bla\n"), + stringConsumer("\n")); + + }); + } + + @Test + public void encodeString() { + Flux source = Flux.just("foo", "bar"); + MockServerHttpResponse outputMessage = new MockServerHttpResponse(); + converter.write(source, ResolvableType.forClass(String.class), + new MediaType("text", "event-stream"), outputMessage); + + Publisher> result = outputMessage.getBodyWithFlush(); + TestSubscriber.subscribe(result). + assertNoError(). + assertValuesWith(publisher -> { + TestSubscriber.subscribe(publisher).assertNoError() + .assertValuesWith(stringConsumer("data:foo\n"), + stringConsumer("\n")); + + }, publisher -> { + TestSubscriber.subscribe(publisher).assertNoError() + .assertValuesWith(stringConsumer("data:bar\n"), + stringConsumer("\n")); + + }); + } + + @Test + public void encodeMultiLineString() { + Flux source = Flux.just("foo\nbar", "foo\nbaz"); + MockServerHttpResponse outputMessage = new MockServerHttpResponse(); + converter.write(source, ResolvableType.forClass(String.class), + new MediaType("text", "event-stream"), outputMessage); + + Publisher> result = outputMessage.getBodyWithFlush(); + TestSubscriber.subscribe(result). + assertNoError(). + assertValuesWith(publisher -> { + TestSubscriber.subscribe(publisher).assertNoError() + .assertValuesWith(stringConsumer("data:foo\ndata:bar\n"), + stringConsumer("\n")); + + }, publisher -> { + TestSubscriber.subscribe(publisher).assertNoError() + .assertValuesWith(stringConsumer("data:foo\ndata:baz\n"), + stringConsumer("\n")); + + }); + } + + @Test + public void encodePojo() { + Flux source = Flux.just(new Pojo("foofoo", "barbar"), + new Pojo("foofoofoo", "barbarbar")); + MockServerHttpResponse outputMessage = new MockServerHttpResponse(); + converter.write(source, ResolvableType.forClass(Pojo.class), + new MediaType("text", "event-stream"), outputMessage); + + Publisher> result = outputMessage.getBodyWithFlush(); + TestSubscriber.subscribe(result). + assertNoError(). + assertValuesWith(publisher -> { + TestSubscriber.subscribe(publisher).assertNoError() + .assertValuesWith(stringConsumer("data:"), stringConsumer( + "{\"foo\":\"foofoo\",\"bar\":\"barbar\"}"), + stringConsumer("\n"), stringConsumer("\n")); + + }, publisher -> { + TestSubscriber.subscribe(publisher).assertNoError() + .assertValuesWith(stringConsumer("data:"), stringConsumer( + "{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}"), + stringConsumer("\n"), stringConsumer("\n")); + + }); + } + +} diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java index 5f1222ef80..d97bc276e7 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java @@ -18,18 +18,20 @@ package org.springframework.http.server.reactive; import org.junit.Before; import org.junit.Test; - +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.TestSubscriber; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.FlushingDataBuffer; import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.http.server.reactive.bootstrap.ReactorHttpServer; import org.springframework.web.client.reactive.ClientWebRequestBuilders; import org.springframework.web.client.reactive.ResponseExtractors; import org.springframework.web.client.reactive.WebClient; +import static org.junit.Assume.assumeFalse; + /** * @author Sebastien Deleuze */ @@ -59,19 +61,16 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest .assertValues("data0data1"); } - @Override protected HttpHandler createHttpHandler() { return new FlushingHandler(); } - // Handler that never completes designed to test if flushing is perform correctly when - // a FlushingDataBuffer is written private static class FlushingHandler implements HttpHandler { @Override public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { - Flux responseBody = Flux + Flux> responseBody = Flux .intervalMillis(50) .map(l -> { byte[] data = ("data" + l).getBytes(); @@ -80,9 +79,11 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest return buffer; }) .take(2) - .concatWith(Mono.just(FlushingDataBuffer.INSTANCE)) - .concatWith(Flux.never()); - return response.writeWith(responseBody); + .map(Flux::just); + + responseBody = responseBody.concatWith(Flux.never()); + + return response.writeAndFlushWith(responseBody); } } diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/MockServerHttpResponse.java b/spring-web/src/test/java/org/springframework/http/server/reactive/MockServerHttpResponse.java index 3ffafd090c..75fdf7ec7b 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/MockServerHttpResponse.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/MockServerHttpResponse.java @@ -45,6 +45,8 @@ public class MockServerHttpResponse implements ServerHttpResponse { private Publisher body; + private Publisher> bodyWithFlushes; + private DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory(); @@ -72,12 +74,22 @@ public class MockServerHttpResponse implements ServerHttpResponse { return this.body; } + public Publisher> getBodyWithFlush() { + return this.bodyWithFlushes; + } + @Override public Mono writeWith(Publisher body) { this.body = body; return Flux.from(this.body).then(); } + @Override + public Mono writeAndFlushWith(Publisher> body) { + this.bodyWithFlushes = body; + return Flux.from(this.bodyWithFlushes).then(); + } + @Override public void beforeCommit(Supplier> action) { } @@ -91,5 +103,4 @@ public class MockServerHttpResponse implements ServerHttpResponse { public DataBufferFactory bufferFactory() { return this.dataBufferFactory; } - } diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java index 32dab6a847..e143b67e51 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java @@ -182,6 +182,12 @@ public class ServerHttpResponseTests { return b; }).then(); } + + @Override + protected Mono writeAndFlushWithInternal( + Publisher> body) { + return Mono.error(new UnsupportedOperationException()); + } } }