diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/access/StateMachineAccess.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/access/StateMachineAccess.java index 01770177..8e52b824 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/access/StateMachineAccess.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/access/StateMachineAccess.java @@ -42,7 +42,9 @@ public interface StateMachineAccess extends ReactiveStateMachineAccess stateMachineContext); /** diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/DistributedStateMachine.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/DistributedStateMachine.java index bbc17b7b..cbc6d30f 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/DistributedStateMachine.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/DistributedStateMachine.java @@ -296,7 +296,7 @@ public class DistributedStateMachine extends LifecycleObjectSupport implem log.debug("Joining with context " + context); } - delegate.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(context)); + delegate.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(context).block()); } log.info("Requesting to start delegating state machine " + delegate); log.info("Delegating machine id " + delegate.getUuid()); diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/persist/AbstractStateMachinePersister.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/persist/AbstractStateMachinePersister.java index 3f8608dd..ab0921a3 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/persist/AbstractStateMachinePersister.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/persist/AbstractStateMachinePersister.java @@ -67,7 +67,7 @@ public abstract class AbstractStateMachinePersister implements StateMac public final StateMachine restore(StateMachine stateMachine, T contextObj) throws Exception { final StateMachineContext context = stateMachinePersist.read(contextObj); stateMachine.stopReactively().block(); - stateMachine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(context)); + stateMachine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(context).block()); stateMachine.startReactively().block(); return stateMachine; } diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/service/DefaultStateMachineService.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/service/DefaultStateMachineService.java index 0fb67ff8..0185cc17 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/service/DefaultStateMachineService.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/service/DefaultStateMachineService.java @@ -166,7 +166,7 @@ public class DefaultStateMachineService implements StateMachineService function.resetStateMachine(stateMachineContext)); + stateMachine.getStateMachineAccessor().doWithRegion(function -> function.resetStateMachineReactively(stateMachineContext).block()); return stateMachine; } diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/AbstractStateMachine.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/AbstractStateMachine.java index 5522e71b..ec12d739 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/AbstractStateMachine.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/AbstractStateMachine.java @@ -763,20 +763,16 @@ public abstract class AbstractStateMachine extends StateMachineObjectSuppo .then(); mono = mono.then(resetMono); } else if (s.isOrthogonal() && stateMachineContext.getChilds() != null) { - Collection> regions = ((AbstractState)s).getRegions(); - Mono resetMono = Flux.fromIterable(regions) - .flatMap(region -> { - return Flux.fromIterable(stateMachineContext.getChilds()) - .flatMap(child -> { - return Mono.fromRunnable(() -> { - ((StateMachine)region).getStateMachineAccessor() - .doWithRegion(function -> function.resetStateMachine(child)); - }); - }) - .then(); - }) + Collection> regions = ((AbstractState) s).getRegions(); + Mono resetMono = Flux.fromIterable(regions).flatMap(region -> + Flux.fromIterable(stateMachineContext.getChilds()) + .flatMap(child -> + ((StateMachine) region).getStateMachineAccessor().withRegion().resetStateMachineReactively(child) + ) + .then() + ) .then(); - mono = mono.then(resetMono); + mono = mono.thenEmpty(resetMono); } if (log.isDebugEnabled()) { @@ -787,20 +783,19 @@ public abstract class AbstractStateMachine extends StateMachineObjectSuppo break; } else if (stateMachineContext.getChilds() != null && !stateMachineContext.getChilds().isEmpty()) { if (s.isOrthogonal()) { - Collection> regions = ((AbstractState)s).getRegions(); - Mono resetMono = Flux.fromIterable(regions) - .flatMap(region -> { - return Flux.fromIterable(stateMachineContext.getChilds()) + Collection> regions = ((AbstractState) s).getRegions(); + Mono resetMono = Flux.fromIterable(regions).flatMap(region -> + Flux.fromIterable(stateMachineContext.getChilds()) .flatMap(child -> { - return Mono.fromRunnable(() -> { - if (ObjectUtils.nullSafeEquals(region.getId(), child.getId())) { - ((StateMachine)region).getStateMachineAccessor() - .doWithRegion(function -> function.resetStateMachine(child)); - } - }); + if (ObjectUtils.nullSafeEquals(region.getId(), child.getId())) { + return ((StateMachine) region).getStateMachineAccessor() + .withRegion().resetStateMachineReactively(child); + } else { + return Mono.empty(); + } }) - .then(); - }) + .then() + ) .then(); monos.add(resetMono); } else { diff --git a/spring-statemachine-core/src/test/java/org/springframework/statemachine/StateMachineResetTests.java b/spring-statemachine-core/src/test/java/org/springframework/statemachine/StateMachineResetTests.java index 4655057e..e521ee65 100644 --- a/spring-statemachine-core/src/test/java/org/springframework/statemachine/StateMachineResetTests.java +++ b/spring-statemachine-core/src/test/java/org/springframework/statemachine/StateMachineResetTests.java @@ -77,7 +77,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests { ExtendedState extendedState = new DefaultExtendedState(variables); DefaultStateMachineContext stateMachineContext = new DefaultStateMachineContext(States.S12, Events.I, null, extendedState); - machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext)); + machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block()); doStartAndAssert(machine); assertThat(machine.getState().getIds()).containsOnly(States.S0, States.S1, States.S12); @@ -95,7 +95,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests { ExtendedState extendedState = new DefaultExtendedState(variables); DefaultStateMachineContext stateMachineContext = new DefaultStateMachineContext(States.S211, Events.C, null, extendedState); - machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext)); + machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block()); doStartAndAssert(machine); assertThat(machine.getState().getIds()).containsOnly(States.S0, States.S2, States.S21, States.S211); @@ -113,7 +113,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests { ExtendedState extendedState = new DefaultExtendedState(variables); DefaultStateMachineContext stateMachineContext = new DefaultStateMachineContext(States.S2, Events.C, null, extendedState); - machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext)); + machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block()); doStartAndAssert(machine); assertThat(machine.getState().getIds()).containsOnly(States.S0, States.S2, States.S21, States.S211); @@ -138,7 +138,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests { DefaultStateMachineContext stateMachineContext = new DefaultStateMachineContext(childs, TestStates.S2, TestEvents.E1, null, null); - machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext)); + machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block()); doStartAndAssert(machine); assertThat(machine.getState().getIds()).containsOnly(TestStates.S2, TestStates.S21, TestStates.S31); @@ -162,7 +162,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests { DefaultStateMachineContext stateMachineContext = new DefaultStateMachineContext(childs, TestStates.S2, null, null, null); - machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext)); + machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block()); doStartAndAssert(machine); assertThat(machine.getState().getIds()).containsOnly(TestStates.S2, TestStates.S21, TestStates.S31); @@ -184,7 +184,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests { ExtendedState extendedState = new DefaultExtendedState(variables); DefaultStateMachineContext stateMachineContext = new DefaultStateMachineContext(States.S0, null, null, extendedState); - machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext)); + machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block()); doStartAndAssert(machine); assertThat((Integer)machine.getExtendedState().getVariables().get("count")).isEqualTo(1); @@ -207,7 +207,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests { assertThat((Integer)machine.getExtendedState().getVariables().get("foo")).isZero(); doStopAndAssert(machine); - machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(null)); + machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(null).block()); doStartAndAssert(machine); assertThat(machine.getState().getIds()).containsOnly(States.S0, States.S1, States.S11); assertThat(machine.getExtendedState().getVariables()).isEmpty(); @@ -229,7 +229,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests { DefaultStateMachineContext stateMachineContext = new DefaultStateMachineContext( States.S11, null, null, null); machine.getStateMachineAccessor() - .doWithAllRegions(function -> function.resetStateMachine(stateMachineContext)); + .doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block()); doStartAndAssert(machine); assertThat(machine.getState().getIds()).containsOnly(States.S0, States.S1, States.S11); @@ -246,7 +246,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests { DefaultStateMachineContext stateMachineContext = new DefaultStateMachineContext(States.S1, null, null, null); - machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext)); + machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block()); doStartAndAssert(machine); Thread.sleep(1100); @@ -290,7 +290,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests { ExtendedState extendedState = new DefaultExtendedState(variables); DefaultStateMachineContext stateMachineContext = new DefaultStateMachineContext(States.S0, null, null, extendedState); - machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext)); + machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block()); doStartAndAssert(machine); assertThat((Integer)machine.getExtendedState().getVariables().get("count1")).isEqualTo(1); @@ -321,7 +321,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests { DefaultStateMachineContext stateMachineContext = new DefaultStateMachineContext( SubState.SUB_NEXT, null, null, null); - machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext)); + machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block()); doStartAndAssert(machine); assertThat(machine.getState().getIds()).containsOnly(SuperState.PARENT, SubState.SUB_NEXT); @@ -337,7 +337,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests { DefaultStateMachineContext stateMachineContext = new DefaultStateMachineContext( SuperState.INITIAL, null, null, null); - machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext)); + machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block()); doStartAndAssert(machine); assertThat(machine.getState().getIds()).containsOnly(SuperState.INITIAL); diff --git a/spring-statemachine-recipes/src/main/java/org/springframework/statemachine/recipes/persist/AbstractPersistStateMachineHandler.java b/spring-statemachine-recipes/src/main/java/org/springframework/statemachine/recipes/persist/AbstractPersistStateMachineHandler.java index 9e7e658b..0d8fbace 100644 --- a/spring-statemachine-recipes/src/main/java/org/springframework/statemachine/recipes/persist/AbstractPersistStateMachineHandler.java +++ b/spring-statemachine-recipes/src/main/java/org/springframework/statemachine/recipes/persist/AbstractPersistStateMachineHandler.java @@ -24,6 +24,7 @@ import org.springframework.statemachine.support.DefaultStateMachineContext; import org.springframework.statemachine.support.LifecycleObjectSupport; import org.springframework.statemachine.support.StateMachineInterceptorAdapter; import org.springframework.statemachine.transition.Transition; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.Iterator; @@ -63,7 +64,7 @@ public abstract class AbstractPersistStateMachineHandler extends Lifecycle stateMachine.stopReactively().block(); List> withAllRegions = stateMachine.getStateMachineAccessor().withAllRegions(); for (StateMachineAccess a : withAllRegions) { - a.resetStateMachine(new DefaultStateMachineContext(state, null, null, null)); + a.resetStateMachineReactively(new DefaultStateMachineContext(state, null, null, null)).block(); } stateMachine.startReactively().block(); return stateMachine.sendEvent(event); @@ -77,18 +78,18 @@ public abstract class AbstractPersistStateMachineHandler extends Lifecycle * @return mono for completion */ public Mono handleEventWithStateReactively(Message event, S state) { - StateMachine stateMachine = getInitStateMachine(); - // TODO: REACTOR add docs and revisit this function concept - return Mono.from(stateMachine.stopReactively()) - .then(Mono.fromRunnable(() -> { - List> withAllRegions = stateMachine.getStateMachineAccessor().withAllRegions(); - for (StateMachineAccess a : withAllRegions) { - a.resetStateMachine(new DefaultStateMachineContext(state, null, null, null)); - } - })) - .then(stateMachine.startReactively()) - .thenMany(stateMachine.sendEvent(Mono.just(event))) - .then(); + return Mono.defer(() -> { + StateMachine stateMachine = getInitStateMachine(); + // TODO: REACTOR add docs and revisit this function concept + return Mono.from(stateMachine.stopReactively()) + .thenEmpty( + Flux.fromIterable(stateMachine.getStateMachineAccessor().withAllRegions()) + .flatMap(region -> region.resetStateMachineReactively(new DefaultStateMachineContext(state, null, null, null))) + ) + .then(stateMachine.startReactively()) + .thenMany(stateMachine.sendEvent(Mono.just(event))) + .then(); + }); } /** diff --git a/spring-statemachine-recipes/src/main/java/org/springframework/statemachine/recipes/tasks/TasksHandler.java b/spring-statemachine-recipes/src/main/java/org/springframework/statemachine/recipes/tasks/TasksHandler.java index 808fb9e2..a2cce9ba 100644 --- a/spring-statemachine-recipes/src/main/java/org/springframework/statemachine/recipes/tasks/TasksHandler.java +++ b/spring-statemachine-recipes/src/main/java/org/springframework/statemachine/recipes/tasks/TasksHandler.java @@ -166,7 +166,7 @@ public class TasksHandler { } stateMachine.stopReactively().block(); - stateMachine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(context)); + stateMachine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(context).block()); stateMachine.startReactively().block(); }