diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java index d290b0d695..5eab1e666f 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java @@ -246,7 +246,7 @@ class RSocketBufferLeakTests { void checkForLeaks() { this.rsockets.stream().map(PayloadSavingDecorator::getPayloads) .forEach(payloadInfoProcessor -> { - payloadInfoProcessor.emitComplete(); + payloadInfoProcessor.tryEmitComplete(); payloadInfoProcessor.asFlux() .doOnNext(this::checkForLeak) .blockLast(); @@ -328,7 +328,7 @@ class RSocketBufferLeakTests { } private io.rsocket.Payload addPayload(io.rsocket.Payload payload) { - this.payloads.emitNext(new PayloadLeakInfo(payload)); + this.payloads.tryEmitNext(new PayloadLeakInfo(payload)); return payload; } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java index 68210b3afd..5848af5128 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java @@ -231,7 +231,7 @@ public class RSocketClientToServerIntegrationTests { @MessageMapping("receive") void receive(String payload) { - this.fireForgetPayloads.emitNext(payload); + this.fireForgetPayloads.tryEmitNext(payload); } @MessageMapping("echo") @@ -273,7 +273,7 @@ public class RSocketClientToServerIntegrationTests { @ConnectMapping("foo-updates") public void handleMetadata(@Header("foo") String foo) { - this.metadataPushPayloads.emitNext(foo); + this.metadataPushPayloads.tryEmitNext(foo); } @MessageExceptionHandler diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java index 5756622d7b..049e23fdd5 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java @@ -219,7 +219,7 @@ class RSocketServerToClientIntegrationTests { @MessageMapping("receive") void receive(String payload) { - this.fireForgetPayloads.emitNext(payload); + this.fireForgetPayloads.tryEmitNext(payload); } @MessageMapping("echo") diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java index f343e27654..8b9dbd4716 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java @@ -371,7 +371,7 @@ public class SimpAnnotationMethodMessageHandlerTests { this.messageHandler.handleMessage(message); assertThat(controller.fluxSink).isNotNull(); - controller.fluxSink.emitNext("foo"); + controller.fluxSink.tryEmitNext("foo"); verify(this.converter, never()).toMessage(any(), any(MessageHeaders.class)); } diff --git a/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt b/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt index c899ef0c31..4f47e7b9a8 100644 --- a/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt +++ b/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt @@ -149,13 +149,13 @@ class RSocketClientToServerCoroutinesIntegrationTests { @MessageMapping("receive") fun receive(payload: String) { - fireForgetPayloads.emitNext(payload) + fireForgetPayloads.tryEmitNext(payload) } @MessageMapping("receive-async") suspend fun receiveAsync(payload: String) { delay(10) - fireForgetPayloads.emitNext(payload) + fireForgetPayloads.tryEmitNext(payload) } @MessageMapping("echo-async") diff --git a/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java b/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java index c3318ffa43..a412102f9c 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java @@ -209,7 +209,7 @@ public class MultipartHttpMessageWriterTests extends AbstractLeakCheckingTests { public void singleSubscriberWithResource() throws IOException { Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); Resource logo = new ClassPathResource("/org/springframework/http/converter/logo.jpg"); - sink.emitNext(logo); + sink.tryEmitNext(logo); MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder(); bodyBuilder.asyncPart("logo", sink.asFlux(), Resource.class); diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java index dbd2632928..ae3d4b2bde 100644 --- a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java @@ -140,9 +140,9 @@ public class ReactiveTypeHandlerTests { Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); testDeferredResultSubscriber(sink.asFlux(), Flux.class, forClass(Bar.class), () -> { - sink.emitNext(bar1); - sink.emitNext(bar2); - sink.emitComplete(); + sink.tryEmitNext(bar1); + sink.tryEmitNext(bar2); + sink.tryEmitComplete(); }, Arrays.asList(bar1, bar2)); } @@ -195,10 +195,10 @@ public class ReactiveTypeHandlerTests { EmitterHandler emitterHandler = new EmitterHandler(); sseEmitter.initialize(emitterHandler); - sink.emitNext("foo"); - sink.emitNext("bar"); - sink.emitNext("baz"); - sink.emitComplete(); + sink.tryEmitNext("foo"); + sink.tryEmitNext("bar"); + sink.tryEmitNext("baz"); + sink.tryEmitComplete(); assertThat(emitterHandler.getValuesAsText()).isEqualTo("data:foo\n\ndata:bar\n\ndata:baz\n\n"); } @@ -214,10 +214,10 @@ public class ReactiveTypeHandlerTests { EmitterHandler emitterHandler = new EmitterHandler(); sseEmitter.initialize(emitterHandler); - sink.emitNext(ServerSentEvent.builder("foo").id("1").build()); - sink.emitNext(ServerSentEvent.builder("bar").id("2").build()); - sink.emitNext(ServerSentEvent.builder("baz").id("3").build()); - sink.emitComplete(); + sink.tryEmitNext(ServerSentEvent.builder("foo").id("1").build()); + sink.tryEmitNext(ServerSentEvent.builder("bar").id("2").build()); + sink.tryEmitNext(ServerSentEvent.builder("baz").id("3").build()); + sink.tryEmitComplete(); assertThat(emitterHandler.getValuesAsText()).isEqualTo("id:1\ndata:foo\n\nid:2\ndata:bar\n\nid:3\ndata:baz\n\n"); } @@ -239,9 +239,9 @@ public class ReactiveTypeHandlerTests { Bar bar1 = new Bar("foo"); Bar bar2 = new Bar("bar"); - sink.emitNext(bar1); - sink.emitNext(bar2); - sink.emitComplete(); + sink.tryEmitNext(bar1); + sink.tryEmitNext(bar2); + sink.tryEmitComplete(); assertThat(message.getHeaders().getContentType().toString()).isEqualTo("application/x-ndjson"); assertThat(emitterHandler.getValues()).isEqualTo(Arrays.asList(bar1, "\n", bar2, "\n")); @@ -256,10 +256,10 @@ public class ReactiveTypeHandlerTests { EmitterHandler emitterHandler = new EmitterHandler(); emitter.initialize(emitterHandler); - sink.emitNext("The quick"); - sink.emitNext(" brown fox jumps over "); - sink.emitNext("the lazy dog"); - sink.emitComplete(); + sink.tryEmitNext("The quick"); + sink.tryEmitNext(" brown fox jumps over "); + sink.tryEmitNext("the lazy dog"); + sink.tryEmitComplete(); assertThat(emitterHandler.getValuesAsText()).isEqualTo("The quick brown fox jumps over the lazy dog"); } diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java index 62fed8fa03..d17d0cad31 100644 --- a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java @@ -233,10 +233,10 @@ public class ResponseBodyEmitterReturnValueHandlerTests { assertThat(this.request.isAsyncStarted()).isTrue(); assertThat(this.response.getStatus()).isEqualTo(200); - sink.emitNext("foo"); - sink.emitNext("bar"); - sink.emitNext("baz"); - sink.emitComplete(); + sink.tryEmitNext("foo"); + sink.tryEmitNext("bar"); + sink.tryEmitNext("baz"); + sink.tryEmitComplete(); assertThat(this.response.getContentType()).isEqualTo("text/event-stream"); assertThat(this.response.getContentAsString()).isEqualTo("data:foo\n\ndata:bar\n\ndata:baz\n\n"); @@ -254,8 +254,8 @@ public class ResponseBodyEmitterReturnValueHandlerTests { assertThat(this.request.isAsyncStarted()).isTrue(); IllegalStateException ex = new IllegalStateException("wah wah"); - sink.emitError(ex); - sink.emitComplete(); + sink.tryEmitError(ex); + sink.tryEmitComplete(); WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(this.webRequest); assertThat(asyncManager.getConcurrentResult()).isSameAs(ex); @@ -299,10 +299,10 @@ public class ResponseBodyEmitterReturnValueHandlerTests { assertThat(this.request.isAsyncStarted()).isTrue(); assertThat(this.response.getStatus()).isEqualTo(200); - sink.emitNext("foo"); - sink.emitNext("bar"); - sink.emitNext("baz"); - sink.emitComplete(); + sink.tryEmitNext("foo"); + sink.tryEmitNext("bar"); + sink.tryEmitNext("baz"); + sink.tryEmitComplete(); assertThat(this.response.getContentType()).isEqualTo("text/plain"); assertThat(this.response.getContentAsString()).isEqualTo("foobarbaz");