From 96eab9d061113731fafb65f1d4e5fd5541264129 Mon Sep 17 00:00:00 2001 From: Janne Valkealahti Date: Sat, 31 Oct 2020 09:02:46 +0000 Subject: [PATCH] Fix deprecations - Due to switch to boot 2.4.x, fixing various deprecations where things mostly come from reactor. --- .../support/ReactiveLifecycleManager.java | 28 ++++++++-------- .../support/ReactiveStateMachineExecutor.java | 32 +++++++++---------- .../support/StateMachineUtils.java | 2 +- .../src/main/java/demo/persist/Persist.java | 6 ++-- 4 files changed, 33 insertions(+), 35 deletions(-) diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/ReactiveLifecycleManager.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/ReactiveLifecycleManager.java index 1f7f9775..2ddc0f31 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/ReactiveLifecycleManager.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/ReactiveLifecycleManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,16 +23,18 @@ import java.util.function.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.Many; +import reactor.util.concurrent.Queues; public class ReactiveLifecycleManager implements StateMachineReactiveLifecycle { private static final Log log = LogFactory.getLog(ReactiveLifecycleManager.class); private final AtomicEnum state = new AtomicEnum(LifecycleState.STOPPED); - private EmitterProcessor> startRequestsProcessor; - private EmitterProcessor> stopRequestsProcessor; + private Many> startRequestsSink; + private Many> stopRequestsSink; private Flux> startRequests; private Flux> stopRequests; private Supplier> preStartRequest; @@ -55,10 +57,10 @@ public class ReactiveLifecycleManager implements StateMachineReactiveLifecycle { this.preStopRequest = preStopRequest; this.postStartRequest = postStartRequest; this.postStopRequest = postStopRequest; - this.startRequestsProcessor = EmitterProcessor.>create(false); - this.stopRequestsProcessor = EmitterProcessor.>create(false); - this.startRequests = this.startRequestsProcessor.cache(1); - this.stopRequests = this.stopRequestsProcessor.cache(1); + this.startRequestsSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false); + this.stopRequestsSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false); + this.startRequests = this.startRequestsSink.asFlux().cache(1); + this.stopRequests = this.stopRequestsSink.asFlux().cache(1); } @Override @@ -69,8 +71,7 @@ public class ReactiveLifecycleManager implements StateMachineReactiveLifecycle { .filter(owns -> owns) .flatMap(owns -> this.startRequests.next().flatMap(Function.identity()).doOnSuccess(aVoid -> { state.set(LifecycleState.STARTED); - })) - ; + })); }) .then(Mono.defer(postStartRequest)) .then(Mono.defer(() -> { @@ -79,8 +80,7 @@ public class ReactiveLifecycleManager implements StateMachineReactiveLifecycle { return stopReactively(); } return Mono.empty(); - })) - ; + })); } @Override @@ -145,10 +145,10 @@ public class ReactiveLifecycleManager implements StateMachineReactiveLifecycle { log.debug("Lifecycle from " + expect + " to " + update + " in " + ReactiveLifecycleManager.this); if (update == LifecycleState.STARTING) { log.debug("Next start request with doStartReactively in " + ReactiveLifecycleManager.this); - startRequestsProcessor.onNext(preStartRequest.get()); + startRequestsSink.emitNext(preStartRequest.get(), Sinks.EmitFailureHandler.FAIL_FAST); } else if (update == LifecycleState.STOPPING) { log.debug("Next stop request with doStopReactively in " + ReactiveLifecycleManager.this); - stopRequestsProcessor.onNext(preStopRequest.get()); + stopRequestsSink.emitNext(preStopRequest.get(), Sinks.EmitFailureHandler.FAIL_FAST); } } return set; diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/ReactiveStateMachineExecutor.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/ReactiveStateMachineExecutor.java index 2fd668ba..ac31095a 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/ReactiveStateMachineExecutor.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/ReactiveStateMachineExecutor.java @@ -28,6 +28,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; import java.util.function.Function; import java.util.stream.Collectors; @@ -53,10 +54,11 @@ import org.springframework.statemachine.trigger.Trigger; import org.springframework.statemachine.trigger.TriggerListener; import reactor.core.Disposable; -import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.Many; +import reactor.util.concurrent.Queues; import reactor.util.context.Context; /** @@ -86,8 +88,7 @@ public class ReactiveStateMachineExecutor extends LifecycleObjectSupport i private volatile Message forwardedInitialEvent; private volatile Message queuedMessage = null; private StateMachineExecutorTransit stateMachineExecutorTransit; - private EmitterProcessor triggerProcessor = EmitterProcessor.create(false); - private FluxSink triggerSink; + private Many triggerSink; private Flux triggerFlux; private Disposable triggerDisposable; @@ -111,8 +112,8 @@ public class ReactiveStateMachineExecutor extends LifecycleObjectSupport i @Override protected void onInit() throws Exception { - triggerSink = triggerProcessor.sink(); - triggerFlux = Flux.from(triggerProcessor).flatMap(trigger -> handleTrigger(trigger)); + triggerSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false); + triggerFlux = triggerSink.asFlux().flatMap(trigger -> handleTrigger(trigger)); } @Override @@ -156,7 +157,7 @@ public class ReactiveStateMachineExecutor extends LifecycleObjectSupport i if (log.isDebugEnabled()) { log.debug("Queue trigger " + trigger); } - triggerSink.next(new TriggerQueueItem(trigger, message, null, null)); + triggerSink.emitNext(new TriggerQueueItem(trigger, message, null, null), Sinks.EmitFailureHandler.FAIL_FAST); } @Override @@ -208,10 +209,8 @@ public class ReactiveStateMachineExecutor extends LifecycleObjectSupport i return messages .flatMap(m -> handleEvent(m, callback, triggerCallback)) .doOnNext(i -> { - try { - triggerSink.next(i); - } catch (Exception e) { - log.error("Unable to handle queued event", e); + while (triggerSink.tryEmitNext(i).isFailure()) { + LockSupport.parkNanos(10); } }) .then() @@ -313,7 +312,7 @@ public class ReactiveStateMachineExecutor extends LifecycleObjectSupport i return ret; }) .onErrorResume(resumeTriggerErrorToContext()) - .and(Mono.subscriberContext() + .and(Mono.deferContextual(Mono::just) .doOnNext(ctx -> { if (queueItem.callback != null) { Optional holder = ctx.getOrEmpty(StateMachineSystemConstants.REACTOR_CONTEXT_ERRORS); @@ -336,11 +335,10 @@ public class ReactiveStateMachineExecutor extends LifecycleObjectSupport i } }); } - - })) - .subscriberContext(Context.of(StateMachineSystemConstants.REACTOR_CONTEXT_ERRORS, - new ExecutorExceptionHolder(), REACTOR_CONTEXT_TRIGGER_ERRORS, new ExecutorExceptionHolder())); + .contextWrite(Context.of( + StateMachineSystemConstants.REACTOR_CONTEXT_ERRORS, new ExecutorExceptionHolder(), + REACTOR_CONTEXT_TRIGGER_ERRORS, new ExecutorExceptionHolder())); } @@ -491,7 +489,7 @@ public class ReactiveStateMachineExecutor extends LifecycleObjectSupport i } private static Function> resumeTriggerErrorToContext() { - return t -> Mono.subscriberContext() + return t -> Mono.deferContextual(Mono::just) .doOnNext(ctx -> { Optional holder = ctx.getOrEmpty(REACTOR_CONTEXT_TRIGGER_ERRORS); holder.ifPresent(h -> { diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/StateMachineUtils.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/StateMachineUtils.java index d0b935ce..94a8d355 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/StateMachineUtils.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/StateMachineUtils.java @@ -204,7 +204,7 @@ public abstract class StateMachineUtils { * @return mono for completion */ public static Function> resumeErrorToContext() { - return t -> Mono.subscriberContext() + return t -> Mono.deferContextual(Mono::just) .doOnNext(ctx -> { Optional holder = ctx.getOrEmpty(StateMachineSystemConstants.REACTOR_CONTEXT_ERRORS); holder.ifPresent(h -> { diff --git a/spring-statemachine-samples/persist/src/main/java/demo/persist/Persist.java b/spring-statemachine-samples/persist/src/main/java/demo/persist/Persist.java index a516d166..561c0315 100644 --- a/spring-statemachine-samples/persist/src/main/java/demo/persist/Persist.java +++ b/spring-statemachine-samples/persist/src/main/java/demo/persist/Persist.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2019 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -66,12 +66,12 @@ public class Persist { //tag::snippetB[] public void change(int order, String event) { - Order o = jdbcTemplate.queryForObject("select id, state from orders where id = ?", new Object[] { order }, + Order o = jdbcTemplate.queryForObject("select id, state from orders where id = ?", new RowMapper() { public Order mapRow(ResultSet rs, int rowNum) throws SQLException { return new Order(rs.getInt("id"), rs.getString("state")); } - }); + }, new Object[] { order }); handler.handleEventWithStateReactively(MessageBuilder .withPayload(event).setHeader("order", order).build(), o.state) .subscribe();