diff --git a/spring-core/src/main/java/org/springframework/core/codec/Decoder.java b/spring-core/src/main/java/org/springframework/core/codec/Decoder.java index 5fd981d6ac..51a943e213 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/Decoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/Decoder.java @@ -23,6 +23,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; @@ -92,7 +93,7 @@ public interface Decoder { default T decode(DataBuffer buffer, ResolvableType targetType, @Nullable MimeType mimeType, @Nullable Map hints) throws DecodingException { - MonoProcessor processor = MonoProcessor.create(); + MonoProcessor processor = MonoProcessor.fromSink(Sinks.one()); decodeToMono(Mono.just(buffer), targetType, mimeType, hints).subscribeWith(processor); Assert.state(processor.isTerminated(), "DataBuffer decoding should have completed."); diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/MonoToListenableFutureAdapter.java b/spring-core/src/main/java/org/springframework/util/concurrent/MonoToListenableFutureAdapter.java index 136b86aa25..61901abda6 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/MonoToListenableFutureAdapter.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/MonoToListenableFutureAdapter.java @@ -33,6 +33,7 @@ import org.springframework.util.Assert; * @since 5.1 * @param the object type */ +@SuppressWarnings("deprecation") public class MonoToListenableFutureAdapter implements ListenableFuture { private final MonoProcessor processor; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/MessagingRSocket.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/MessagingRSocket.java index 21c002bf98..4f6e203e16 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/MessagingRSocket.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/MessagingRSocket.java @@ -28,6 +28,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; @@ -162,7 +163,7 @@ class MessagingRSocket implements RSocket { } private Flux handleAndReply(Payload firstPayload, FrameType frameType, Flux payloads) { - MonoProcessor> replyMono = MonoProcessor.create(); + MonoProcessor> replyMono = MonoProcessor.fromSink(Sinks.one()); MessageHeaders headers = createHeaders(firstPayload, frameType, replyMono); AtomicBoolean read = new AtomicBoolean(); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index ca42a1ecd3..95579c4cb3 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.netty.Connection; @@ -204,7 +205,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ } // Report first connect to the ListenableFuture - MonoProcessor connectMono = MonoProcessor.create(); + MonoProcessor connectMono = MonoProcessor.fromSink(Sinks.one()); this.tcpClient .handle(new ReactorNettyHandler(handler)) @@ -315,7 +316,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ logger.debug("Connected to " + conn.address()); } }); - MonoProcessor completion = MonoProcessor.create(); + MonoProcessor completion = MonoProcessor.fromSink(Sinks.one()); TcpConnection

connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion); scheduler.schedule(() -> this.connectionHandler.afterConnected(connection)); diff --git a/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java b/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java index 6281bf6cc6..f52e8668fa 100644 --- a/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java +++ b/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java @@ -26,6 +26,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; @@ -64,7 +65,7 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse { super(dataBufferFactory); this.writeHandler = body -> { // Avoid .then() which causes data buffers to be released - MonoProcessor completion = MonoProcessor.create(); + MonoProcessor completion = MonoProcessor.fromSink(Sinks.one()); this.body = body.doOnComplete(completion::onComplete).doOnError(completion::onError).cache(); this.body.subscribe(); return completion; diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java index ee23babdc4..6393033339 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java @@ -25,6 +25,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import reactor.core.scheduler.Schedulers; import org.springframework.core.io.buffer.DataBuffer; @@ -83,8 +84,8 @@ public class HttpHandlerConnector implements ClientHttpConnector { private Mono doConnect( HttpMethod httpMethod, URI uri, Function> requestCallback) { - MonoProcessor requestWriteCompletion = MonoProcessor.create(); - MonoProcessor handlerCompletion = MonoProcessor.create(); + MonoProcessor requestWriteCompletion = MonoProcessor.fromSink(Sinks.one()); + MonoProcessor handlerCompletion = MonoProcessor.fromSink(Sinks.one()); ClientHttpResponse[] savedResponse = new ClientHttpResponse[1]; MockClientHttpRequest mockClientRequest = new MockClientHttpRequest(httpMethod, uri); diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java index 47b035e23f..b904da963d 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java @@ -27,6 +27,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DefaultDataBufferFactory; @@ -132,7 +133,7 @@ class WiretapConnector implements ClientHttpConnector { private final DataBuffer buffer = DefaultDataBufferFactory.sharedInstance.allocateBuffer(); - private final MonoProcessor content = MonoProcessor.create(); + private final MonoProcessor content = MonoProcessor.fromSink(Sinks.one()); private boolean hasContentConsumer; diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/SyncInvocableHandlerMethod.java b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/SyncInvocableHandlerMethod.java index 07911b1dcd..26a9e0cb64 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/SyncInvocableHandlerMethod.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/SyncInvocableHandlerMethod.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.stream.Collectors; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import org.springframework.core.DefaultParameterNameDiscoverer; import org.springframework.core.ParameterNameDiscoverer; @@ -102,7 +103,7 @@ public class SyncInvocableHandlerMethod extends HandlerMethod { public HandlerResult invokeForHandlerResult(ServerWebExchange exchange, BindingContext bindingContext, Object... providedArgs) { - MonoProcessor processor = MonoProcessor.create(); + MonoProcessor processor = MonoProcessor.fromSink(Sinks.one()); this.delegate.invoke(exchange, bindingContext, providedArgs).subscribeWith(processor); if (processor.isTerminated()) { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java index 05a3fda827..f61a87c3a7 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java @@ -25,6 +25,7 @@ import java.util.Optional; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import org.springframework.beans.BeanUtils; import org.springframework.core.DefaultParameterNameDiscoverer; @@ -116,7 +117,7 @@ public class ModelAttributeMethodArgumentResolver extends HandlerMethodArgumentR Mono valueMono = prepareAttributeMono(name, valueType, context, exchange); Map model = context.getModel().asMap(); - MonoProcessor bindingResultMono = MonoProcessor.create(); + MonoProcessor bindingResultMono = MonoProcessor.fromSink(Sinks.one()); model.put(BindingResult.MODEL_KEY_PREFIX + name, bindingResultMono); return valueMono.flatMap(value -> { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java index 52c29ecc72..3cef0fbcf2 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java @@ -26,6 +26,7 @@ import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import reactor.util.concurrent.Queues; import org.springframework.core.io.buffer.DataBufferFactory; @@ -73,7 +74,7 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc private final AtomicBoolean sendCalled = new AtomicBoolean(); - private final MonoProcessor closeStatusProcessor = MonoProcessor.create(); + private final MonoProcessor closeStatusProcessor = MonoProcessor.fromSink(Sinks.one()); /** diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java index 691551736b..ee18a456bf 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java @@ -27,6 +27,7 @@ import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.io.UpgradeListener; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import org.springframework.context.Lifecycle; import org.springframework.core.io.buffer.DefaultDataBufferFactory; @@ -136,7 +137,7 @@ public class JettyWebSocketClient implements WebSocketClient, Lifecycle { } private Mono executeInternal(URI url, HttpHeaders headers, WebSocketHandler handler) { - MonoProcessor completionMono = MonoProcessor.create(); + MonoProcessor completionMono = MonoProcessor.fromSink(Sinks.one()); return Mono.fromCallable( () -> { if (logger.isDebugEnabled()) { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java index 41bbc7697c..b46b30e059 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import reactor.core.scheduler.Schedulers; import org.springframework.core.io.buffer.DataBufferFactory; @@ -95,7 +96,7 @@ public class StandardWebSocketClient implements WebSocketClient { } private Mono executeInternal(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) { - MonoProcessor completionMono = MonoProcessor.create(); + MonoProcessor completionMono = MonoProcessor.fromSink(Sinks.one()); return Mono.fromCallable( () -> { if (logger.isDebugEnabled()) { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java index 7b3cbe50e0..b83c73f4bf 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java @@ -34,6 +34,7 @@ import org.xnio.IoFuture; import org.xnio.XnioWorker; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory; @@ -72,7 +73,8 @@ public class UndertowWebSocketClient implements WebSocketClient { * @param worker the Xnio worker */ public UndertowWebSocketClient(XnioWorker worker) { - this(worker, builder -> {}); + this(worker, builder -> { + }); } /** @@ -153,7 +155,7 @@ public class UndertowWebSocketClient implements WebSocketClient { } private Mono executeInternal(URI url, HttpHeaders headers, WebSocketHandler handler) { - MonoProcessor completion = MonoProcessor.create(); + MonoProcessor completion = MonoProcessor.fromSink(Sinks.one()); return Mono.fromCallable( () -> { if (logger.isDebugEnabled()) {