Release cached item in ChannelSendOperator

1. If the write Subscriber cancels with the item cached, release it.

2. If the write Publisher emits an error while the item is cached, when
the write Subscriber subscribes, release the cached item and emit the
error signal.

Closes gh-22720
This commit is contained in:
Rossen Stoyanchev
2019-04-01 17:14:49 -04:00
parent de2a01eee4
commit 9c48d63082
2 changed files with 113 additions and 14 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,25 +16,29 @@
package org.springframework.http.server.reactive;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBufAllocator;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.LeakAwareDataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import static org.junit.Assert.*;
/**
* @author Rossen Stoyanchev
@@ -50,9 +54,6 @@ public class ChannelSendOperatorTests {
this.writer = new OneByOneAsyncWriter();
}
private <T> Mono<Void> sendOperator(Publisher<String> source){
return new ChannelSendOperator<>(source, writer::send);
}
@Test
public void errorBeforeFirstItem() throws Exception {
@@ -130,6 +131,66 @@ public class ChannelSendOperatorTests {
assertSame(error, this.writer.error);
}
@Test // gh-22720
public void cancelWhileItemCached() {
NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate);
ChannelSendOperator<DataBuffer> operator = new ChannelSendOperator<>(
Mono.fromCallable(() -> {
DataBuffer dataBuffer = bufferFactory.allocateBuffer();
dataBuffer.write("foo", StandardCharsets.UTF_8);
return dataBuffer;
}),
publisher -> {
ZeroDemandSubscriber subscriber = new ZeroDemandSubscriber();
publisher.subscribe(subscriber);
return Mono.never();
});
BaseSubscriber<Void> subscriber = new BaseSubscriber<Void>() {};
operator.subscribe(subscriber);
subscriber.cancel();
bufferFactory.checkForLeaks();
}
@Test // gh-22720
public void errorWhileItemCached() {
NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate);
ZeroDemandSubscriber writeSubscriber = new ZeroDemandSubscriber();
ChannelSendOperator<DataBuffer> operator = new ChannelSendOperator<>(
Flux.create(sink -> {
DataBuffer dataBuffer = bufferFactory.allocateBuffer();
dataBuffer.write("foo", StandardCharsets.UTF_8);
sink.next(dataBuffer);
sink.error(new IllegalStateException("err"));
}),
publisher -> {
publisher.subscribe(writeSubscriber);
return Mono.never();
});
operator.subscribe(new BaseSubscriber<Void>() {});
try {
writeSubscriber.signalDemand(1); // Let cached signals ("foo" and error) be published..
}
catch (Throwable ex) {
assertNotNull(ex.getCause());
assertEquals("err", ex.getCause().getMessage());
}
bufferFactory.checkForLeaks();
}
private <T> Mono<Void> sendOperator(Publisher<String> source){
return new ChannelSendOperator<>(source, writer::send);
}
private static class OneByOneAsyncWriter {
@@ -182,4 +243,18 @@ public class ChannelSendOperatorTests {
}
}
private static class ZeroDemandSubscriber extends BaseSubscriber<DataBuffer> {
@Override
protected void hookOnSubscribe(Subscription subscription) {
// Just subscribe without requesting
}
public void signalDemand(long demand) {
upstream().request(demand);
}
}
}