Add writeAndFlushWith to ReactiveHttpOutputMessage
This commit changes the reactive flushing mechanism to use a newly introduced writeAndFlushWith(Publisher<Publisher<DataBuffer>>) on ReactiveHttpOutputMessage instead of using the FlushingDataBuffer. Issue: https://github.com/spring-projects/spring-reactive/issues/125
This commit is contained in:
@@ -1,143 +0,0 @@
|
||||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.http.codec;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import org.junit.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.TestSubscriber;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.FlushingDataBuffer;
|
||||
import org.springframework.http.codec.json.JacksonJsonEncoder;
|
||||
import org.springframework.util.MimeType;
|
||||
|
||||
/**
|
||||
* @author Sebastien Deleuze
|
||||
*/
|
||||
public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase {
|
||||
|
||||
@Test
|
||||
public void nullMimeType() {
|
||||
SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder()));
|
||||
assertTrue(encoder.canEncode(ResolvableType.forClass(Object.class), null));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void unsupportedMimeType() {
|
||||
SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder()));
|
||||
assertFalse(encoder.canEncode(ResolvableType.forClass(Object.class), new MimeType("foo", "bar")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void supportedMimeType() {
|
||||
SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder()));
|
||||
assertTrue(encoder.canEncode(ResolvableType.forClass(Object.class), new MimeType("text", "event-stream")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void encodeServerSentEvent() {
|
||||
SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder()));
|
||||
SseEvent event = new SseEvent();
|
||||
event.setId("c42");
|
||||
event.setName("foo");
|
||||
event.setComment("bla\nbla bla\nbla bla bla");
|
||||
event.setReconnectTime(123L);
|
||||
Mono<SseEvent> source = Mono.just(event);
|
||||
Flux<DataBuffer> output = encoder.encode(source, this.bufferFactory,
|
||||
ResolvableType.forClass(SseEvent.class), new MimeType("text", "event-stream"));
|
||||
TestSubscriber
|
||||
.subscribe(output)
|
||||
.assertNoError()
|
||||
.assertValuesWith(
|
||||
stringConsumer(
|
||||
"id:c42\n" +
|
||||
"event:foo\n" +
|
||||
"retry:123\n" +
|
||||
":bla\n:bla bla\n:bla bla bla\n"),
|
||||
stringConsumer("\n"),
|
||||
b -> assertEquals(FlushingDataBuffer.class, b.getClass())
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void encodeString() {
|
||||
SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder()));
|
||||
Flux<String> source = Flux.just("foo", "bar");
|
||||
Flux<DataBuffer> output = encoder.encode(source, this.bufferFactory,
|
||||
ResolvableType.forClass(String.class), new MimeType("text", "event-stream"));
|
||||
TestSubscriber
|
||||
.subscribe(output)
|
||||
.assertNoError()
|
||||
.assertValuesWith(
|
||||
stringConsumer("data:foo\n"),
|
||||
stringConsumer("\n"),
|
||||
b -> assertEquals(FlushingDataBuffer.class, b.getClass()),
|
||||
stringConsumer("data:bar\n"),
|
||||
stringConsumer("\n"),
|
||||
b -> assertEquals(FlushingDataBuffer.class, b.getClass())
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void encodeMultilineString() {
|
||||
SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder()));
|
||||
Flux<String> source = Flux.just("foo\nbar", "foo\nbaz");
|
||||
Flux<DataBuffer> output = encoder.encode(source, this.bufferFactory,
|
||||
ResolvableType.forClass(String.class), new MimeType("text", "event-stream"));
|
||||
TestSubscriber
|
||||
.subscribe(output)
|
||||
.assertNoError()
|
||||
.assertValuesWith(
|
||||
stringConsumer("data:foo\ndata:bar\n"),
|
||||
stringConsumer("\n"),
|
||||
b -> assertEquals(FlushingDataBuffer.class, b.getClass()),
|
||||
stringConsumer("data:foo\ndata:baz\n"),
|
||||
stringConsumer("\n"),
|
||||
b -> assertEquals(FlushingDataBuffer.class, b.getClass())
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void encodePojo() {
|
||||
SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder()));
|
||||
Flux<Pojo> source = Flux.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar"));
|
||||
Flux<DataBuffer> output = encoder.encode(source, this.bufferFactory,
|
||||
ResolvableType.forClass(Pojo.class), new MimeType("text", "event-stream"));
|
||||
TestSubscriber
|
||||
.subscribe(output)
|
||||
.assertNoError()
|
||||
.assertValuesWith(
|
||||
stringConsumer("data:"),
|
||||
stringConsumer("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}"),
|
||||
stringConsumer("\n"),
|
||||
stringConsumer("\n"),
|
||||
b -> assertEquals(FlushingDataBuffer.class, b.getClass()),
|
||||
stringConsumer("data:"),
|
||||
stringConsumer("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}"),
|
||||
stringConsumer("\n"),
|
||||
stringConsumer("\n"),
|
||||
b -> assertEquals(FlushingDataBuffer.class, b.getClass())
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,160 @@
|
||||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.http.converter.reactive;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.TestSubscriber;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.codec.Pojo;
|
||||
import org.springframework.http.codec.SseEvent;
|
||||
import org.springframework.http.codec.json.JacksonJsonEncoder;
|
||||
import org.springframework.http.server.reactive.MockServerHttpResponse;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* @author Sebastien Deleuze
|
||||
*/
|
||||
public class SseEventHttpMessageWriterTests
|
||||
extends AbstractDataBufferAllocatingTestCase {
|
||||
|
||||
private SseEventHttpMessageWriter converter = new SseEventHttpMessageWriter(
|
||||
Collections.singletonList(new JacksonJsonEncoder()));
|
||||
|
||||
@Test
|
||||
public void nullMimeType() {
|
||||
assertTrue(converter.canWrite(ResolvableType.forClass(Object.class), null));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void unsupportedMimeType() {
|
||||
assertFalse(converter.canWrite(ResolvableType.forClass(Object.class),
|
||||
new MediaType("foo", "bar")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void supportedMimeType() {
|
||||
assertTrue(converter.canWrite(ResolvableType.forClass(Object.class),
|
||||
new MediaType("text", "event-stream")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void encodeServerSentEvent() {
|
||||
SseEvent event = new SseEvent();
|
||||
event.setId("c42");
|
||||
event.setName("foo");
|
||||
event.setComment("bla\nbla bla\nbla bla bla");
|
||||
event.setReconnectTime(123L);
|
||||
Mono<SseEvent> source = Mono.just(event);
|
||||
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
|
||||
converter.write(source, ResolvableType.forClass(SseEvent.class),
|
||||
new MediaType("text", "event-stream"), outputMessage);
|
||||
|
||||
Publisher<Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
|
||||
TestSubscriber.subscribe(result).
|
||||
assertNoError().
|
||||
assertValuesWith(publisher -> {
|
||||
TestSubscriber.subscribe(publisher).assertNoError().assertValuesWith(
|
||||
stringConsumer("id:c42\n" + "event:foo\n" + "retry:123\n" +
|
||||
":bla\n:bla bla\n:bla bla bla\n"),
|
||||
stringConsumer("\n"));
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void encodeString() {
|
||||
Flux<String> source = Flux.just("foo", "bar");
|
||||
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
|
||||
converter.write(source, ResolvableType.forClass(String.class),
|
||||
new MediaType("text", "event-stream"), outputMessage);
|
||||
|
||||
Publisher<Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
|
||||
TestSubscriber.subscribe(result).
|
||||
assertNoError().
|
||||
assertValuesWith(publisher -> {
|
||||
TestSubscriber.subscribe(publisher).assertNoError()
|
||||
.assertValuesWith(stringConsumer("data:foo\n"),
|
||||
stringConsumer("\n"));
|
||||
|
||||
}, publisher -> {
|
||||
TestSubscriber.subscribe(publisher).assertNoError()
|
||||
.assertValuesWith(stringConsumer("data:bar\n"),
|
||||
stringConsumer("\n"));
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void encodeMultiLineString() {
|
||||
Flux<String> source = Flux.just("foo\nbar", "foo\nbaz");
|
||||
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
|
||||
converter.write(source, ResolvableType.forClass(String.class),
|
||||
new MediaType("text", "event-stream"), outputMessage);
|
||||
|
||||
Publisher<Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
|
||||
TestSubscriber.subscribe(result).
|
||||
assertNoError().
|
||||
assertValuesWith(publisher -> {
|
||||
TestSubscriber.subscribe(publisher).assertNoError()
|
||||
.assertValuesWith(stringConsumer("data:foo\ndata:bar\n"),
|
||||
stringConsumer("\n"));
|
||||
|
||||
}, publisher -> {
|
||||
TestSubscriber.subscribe(publisher).assertNoError()
|
||||
.assertValuesWith(stringConsumer("data:foo\ndata:baz\n"),
|
||||
stringConsumer("\n"));
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void encodePojo() {
|
||||
Flux<Pojo> source = Flux.just(new Pojo("foofoo", "barbar"),
|
||||
new Pojo("foofoofoo", "barbarbar"));
|
||||
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
|
||||
converter.write(source, ResolvableType.forClass(Pojo.class),
|
||||
new MediaType("text", "event-stream"), outputMessage);
|
||||
|
||||
Publisher<Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
|
||||
TestSubscriber.subscribe(result).
|
||||
assertNoError().
|
||||
assertValuesWith(publisher -> {
|
||||
TestSubscriber.subscribe(publisher).assertNoError()
|
||||
.assertValuesWith(stringConsumer("data:"), stringConsumer(
|
||||
"{\"foo\":\"foofoo\",\"bar\":\"barbar\"}"),
|
||||
stringConsumer("\n"), stringConsumer("\n"));
|
||||
|
||||
}, publisher -> {
|
||||
TestSubscriber.subscribe(publisher).assertNoError()
|
||||
.assertValuesWith(stringConsumer("data:"), stringConsumer(
|
||||
"{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}"),
|
||||
stringConsumer("\n"), stringConsumer("\n"));
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
@@ -18,18 +18,20 @@ package org.springframework.http.server.reactive;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.TestSubscriber;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.FlushingDataBuffer;
|
||||
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
|
||||
import org.springframework.http.server.reactive.bootstrap.ReactorHttpServer;
|
||||
import org.springframework.web.client.reactive.ClientWebRequestBuilders;
|
||||
import org.springframework.web.client.reactive.ResponseExtractors;
|
||||
import org.springframework.web.client.reactive.WebClient;
|
||||
|
||||
import static org.junit.Assume.assumeFalse;
|
||||
|
||||
/**
|
||||
* @author Sebastien Deleuze
|
||||
*/
|
||||
@@ -59,19 +61,16 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
|
||||
.assertValues("data0data1");
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected HttpHandler createHttpHandler() {
|
||||
return new FlushingHandler();
|
||||
}
|
||||
|
||||
// Handler that never completes designed to test if flushing is perform correctly when
|
||||
// a FlushingDataBuffer is written
|
||||
private static class FlushingHandler implements HttpHandler {
|
||||
|
||||
@Override
|
||||
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
|
||||
Flux<DataBuffer> responseBody = Flux
|
||||
Flux<Publisher<DataBuffer>> responseBody = Flux
|
||||
.intervalMillis(50)
|
||||
.map(l -> {
|
||||
byte[] data = ("data" + l).getBytes();
|
||||
@@ -80,9 +79,11 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
|
||||
return buffer;
|
||||
})
|
||||
.take(2)
|
||||
.concatWith(Mono.just(FlushingDataBuffer.INSTANCE))
|
||||
.concatWith(Flux.never());
|
||||
return response.writeWith(responseBody);
|
||||
.map(Flux::just);
|
||||
|
||||
responseBody = responseBody.concatWith(Flux.never());
|
||||
|
||||
return response.writeAndFlushWith(responseBody);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -45,6 +45,8 @@ public class MockServerHttpResponse implements ServerHttpResponse {
|
||||
|
||||
private Publisher<DataBuffer> body;
|
||||
|
||||
private Publisher<Publisher<DataBuffer>> bodyWithFlushes;
|
||||
|
||||
private DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
|
||||
|
||||
|
||||
@@ -72,12 +74,22 @@ public class MockServerHttpResponse implements ServerHttpResponse {
|
||||
return this.body;
|
||||
}
|
||||
|
||||
public Publisher<Publisher<DataBuffer>> getBodyWithFlush() {
|
||||
return this.bodyWithFlushes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> writeWith(Publisher<DataBuffer> body) {
|
||||
this.body = body;
|
||||
return Flux.from(this.body).then();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> writeAndFlushWith(Publisher<Publisher<DataBuffer>> body) {
|
||||
this.bodyWithFlushes = body;
|
||||
return Flux.from(this.bodyWithFlushes).then();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeCommit(Supplier<? extends Mono<Void>> action) {
|
||||
}
|
||||
@@ -91,5 +103,4 @@ public class MockServerHttpResponse implements ServerHttpResponse {
|
||||
public DataBufferFactory bufferFactory() {
|
||||
return this.dataBufferFactory;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -182,6 +182,12 @@ public class ServerHttpResponseTests {
|
||||
return b;
|
||||
}).then();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> writeAndFlushWithInternal(
|
||||
Publisher<Publisher<DataBuffer>> body) {
|
||||
return Mono.error(new UnsupportedOperationException());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user