Add test case for writeFunction error signal
See gh-22720
This commit is contained in:
@@ -17,6 +17,7 @@
|
||||
package org.springframework.http.server.reactive;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
@@ -33,6 +34,7 @@ import reactor.core.publisher.BaseSubscriber;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.Signal;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.LeakAwareDataBufferFactory;
|
||||
@@ -156,7 +158,12 @@ public class ChannelSendOperatorTests {
|
||||
}
|
||||
|
||||
@Test // gh-22720
|
||||
public void errorWhileItemCached() {
|
||||
public void errorFromWriteSourceWhileItemCached() {
|
||||
|
||||
// 1. First item received
|
||||
// 2. writeFunction applied and writeCompletionBarrier subscribed to it
|
||||
// 3. Write Publisher fails right after that and before request(n) from server
|
||||
|
||||
NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
|
||||
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate);
|
||||
ZeroDemandSubscriber writeSubscriber = new ZeroDemandSubscriber();
|
||||
@@ -186,6 +193,30 @@ public class ChannelSendOperatorTests {
|
||||
bufferFactory.checkForLeaks();
|
||||
}
|
||||
|
||||
@Test // gh-22720
|
||||
public void errorFromWriteFunctionWhileItemCached() {
|
||||
|
||||
// 1. First item received
|
||||
// 2. writeFunction applied and writeCompletionBarrier subscribed to it
|
||||
// 3. writeFunction fails, e.g. to flush status and headers, before request(n) from server
|
||||
|
||||
NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
|
||||
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate);
|
||||
|
||||
ChannelSendOperator<DataBuffer> operator = new ChannelSendOperator<>(
|
||||
Flux.create(sink -> {
|
||||
DataBuffer dataBuffer = bufferFactory.allocateBuffer();
|
||||
dataBuffer.write("foo", StandardCharsets.UTF_8);
|
||||
sink.next(dataBuffer);
|
||||
}),
|
||||
publisher -> {
|
||||
publisher.subscribe(new ZeroDemandSubscriber());
|
||||
return Mono.error(new IllegalStateException("err"));
|
||||
});
|
||||
|
||||
StepVerifier.create(operator).expectErrorMessage("err").verify(Duration.ofSeconds(5));
|
||||
bufferFactory.checkForLeaks();
|
||||
}
|
||||
|
||||
private <T> Mono<Void> sendOperator(Publisher<String> source){
|
||||
return new ChannelSendOperator<>(source, writer::send);
|
||||
|
||||
Reference in New Issue
Block a user