Replace resetStateMachine with resetStateMachineReactively

- improve AbstractPersistStateMachineHandler#handleEventWithStateReactively
- Backport #949
- Fixess #956

Signed-off-by: xJoeWoo <xjoewoo@gmail.com>
This commit is contained in:
xJoeWoo
2021-04-17 02:06:33 +08:00
committed by Janne Valkealahti
parent caaf5ecc43
commit ec0639555d
8 changed files with 52 additions and 54 deletions

View File

@@ -42,7 +42,9 @@ public interface StateMachineAccess<S, E> extends ReactiveStateMachineAccess<S,
* Reset state machine.
*
* @param stateMachineContext the state machine context
* @see #resetStateMachineReactively(StateMachineContext)
*/
@Deprecated
void resetStateMachine(StateMachineContext<S, E> stateMachineContext);
/**

View File

@@ -296,7 +296,7 @@ public class DistributedStateMachine<S, E> extends LifecycleObjectSupport implem
log.debug("Joining with context " + context);
}
delegate.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(context));
delegate.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(context).block());
}
log.info("Requesting to start delegating state machine " + delegate);
log.info("Delegating machine id " + delegate.getUuid());

View File

@@ -67,7 +67,7 @@ public abstract class AbstractStateMachinePersister<S, E, T> implements StateMac
public final StateMachine<S, E> restore(StateMachine<S, E> stateMachine, T contextObj) throws Exception {
final StateMachineContext<S, E> context = stateMachinePersist.read(contextObj);
stateMachine.stopReactively().block();
stateMachine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(context));
stateMachine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(context).block());
stateMachine.startReactively().block();
return stateMachine;
}

View File

@@ -166,7 +166,7 @@ public class DefaultStateMachineService<S, E> implements StateMachineService<S,
}
stateMachine.stopReactively().block();
// only go via top region
stateMachine.getStateMachineAccessor().doWithRegion(function -> function.resetStateMachine(stateMachineContext));
stateMachine.getStateMachineAccessor().doWithRegion(function -> function.resetStateMachineReactively(stateMachineContext).block());
return stateMachine;
}

View File

@@ -763,20 +763,16 @@ public abstract class AbstractStateMachine<S, E> extends StateMachineObjectSuppo
.then();
mono = mono.then(resetMono);
} else if (s.isOrthogonal() && stateMachineContext.getChilds() != null) {
Collection<Region<S, E>> regions = ((AbstractState<S, E>)s).getRegions();
Mono<Void> resetMono = Flux.fromIterable(regions)
.flatMap(region -> {
return Flux.fromIterable(stateMachineContext.getChilds())
.flatMap(child -> {
return Mono.fromRunnable(() -> {
((StateMachine<S, E>)region).getStateMachineAccessor()
.doWithRegion(function -> function.resetStateMachine(child));
});
})
.then();
})
Collection<Region<S, E>> regions = ((AbstractState<S, E>) s).getRegions();
Mono<Void> resetMono = Flux.fromIterable(regions).flatMap(region ->
Flux.fromIterable(stateMachineContext.getChilds())
.flatMap(child ->
((StateMachine<S, E>) region).getStateMachineAccessor().withRegion().resetStateMachineReactively(child)
)
.then()
)
.then();
mono = mono.then(resetMono);
mono = mono.thenEmpty(resetMono);
}
if (log.isDebugEnabled()) {
@@ -787,20 +783,19 @@ public abstract class AbstractStateMachine<S, E> extends StateMachineObjectSuppo
break;
} else if (stateMachineContext.getChilds() != null && !stateMachineContext.getChilds().isEmpty()) {
if (s.isOrthogonal()) {
Collection<Region<S, E>> regions = ((AbstractState<S, E>)s).getRegions();
Mono<Void> resetMono = Flux.fromIterable(regions)
.flatMap(region -> {
return Flux.fromIterable(stateMachineContext.getChilds())
Collection<Region<S, E>> regions = ((AbstractState<S, E>) s).getRegions();
Mono<Void> resetMono = Flux.fromIterable(regions).flatMap(region ->
Flux.fromIterable(stateMachineContext.getChilds())
.flatMap(child -> {
return Mono.fromRunnable(() -> {
if (ObjectUtils.nullSafeEquals(region.getId(), child.getId())) {
((StateMachine<S, E>)region).getStateMachineAccessor()
.doWithRegion(function -> function.resetStateMachine(child));
}
});
if (ObjectUtils.nullSafeEquals(region.getId(), child.getId())) {
return ((StateMachine<S, E>) region).getStateMachineAccessor()
.withRegion().resetStateMachineReactively(child);
} else {
return Mono.empty();
}
})
.then();
})
.then()
)
.then();
monos.add(resetMono);
} else {

View File

@@ -77,7 +77,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests {
ExtendedState extendedState = new DefaultExtendedState(variables);
DefaultStateMachineContext<States,Events> stateMachineContext = new DefaultStateMachineContext<States, Events>(States.S12, Events.I, null, extendedState);
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext));
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block());
doStartAndAssert(machine);
assertThat(machine.getState().getIds()).containsOnly(States.S0, States.S1, States.S12);
@@ -95,7 +95,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests {
ExtendedState extendedState = new DefaultExtendedState(variables);
DefaultStateMachineContext<States,Events> stateMachineContext = new DefaultStateMachineContext<States, Events>(States.S211, Events.C, null, extendedState);
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext));
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block());
doStartAndAssert(machine);
assertThat(machine.getState().getIds()).containsOnly(States.S0, States.S2, States.S21, States.S211);
@@ -113,7 +113,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests {
ExtendedState extendedState = new DefaultExtendedState(variables);
DefaultStateMachineContext<States,Events> stateMachineContext = new DefaultStateMachineContext<States, Events>(States.S2, Events.C, null, extendedState);
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext));
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block());
doStartAndAssert(machine);
assertThat(machine.getState().getIds()).containsOnly(States.S0, States.S2, States.S21, States.S211);
@@ -138,7 +138,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests {
DefaultStateMachineContext<TestStates, TestEvents> stateMachineContext =
new DefaultStateMachineContext<TestStates, TestEvents>(childs, TestStates.S2, TestEvents.E1, null, null);
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext));
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block());
doStartAndAssert(machine);
assertThat(machine.getState().getIds()).containsOnly(TestStates.S2, TestStates.S21, TestStates.S31);
@@ -162,7 +162,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests {
DefaultStateMachineContext<TestStates, TestEvents> stateMachineContext =
new DefaultStateMachineContext<TestStates, TestEvents>(childs, TestStates.S2, null, null, null);
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext));
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block());
doStartAndAssert(machine);
assertThat(machine.getState().getIds()).containsOnly(TestStates.S2, TestStates.S21, TestStates.S31);
@@ -184,7 +184,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests {
ExtendedState extendedState = new DefaultExtendedState(variables);
DefaultStateMachineContext<States,Events> stateMachineContext = new DefaultStateMachineContext<States, Events>(States.S0, null, null, extendedState);
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext));
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block());
doStartAndAssert(machine);
assertThat((Integer)machine.getExtendedState().getVariables().get("count")).isEqualTo(1);
@@ -207,7 +207,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests {
assertThat((Integer)machine.getExtendedState().getVariables().get("foo")).isZero();
doStopAndAssert(machine);
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(null));
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(null).block());
doStartAndAssert(machine);
assertThat(machine.getState().getIds()).containsOnly(States.S0, States.S1, States.S11);
assertThat(machine.getExtendedState().getVariables()).isEmpty();
@@ -229,7 +229,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests {
DefaultStateMachineContext<States, Events> stateMachineContext = new DefaultStateMachineContext<States, Events>(
States.S11, null, null, null);
machine.getStateMachineAccessor()
.doWithAllRegions(function -> function.resetStateMachine(stateMachineContext));
.doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block());
doStartAndAssert(machine);
assertThat(machine.getState().getIds()).containsOnly(States.S0, States.S1, States.S11);
@@ -246,7 +246,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests {
DefaultStateMachineContext<States, Events> stateMachineContext = new DefaultStateMachineContext<States, Events>(States.S1, null,
null, null);
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext));
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block());
doStartAndAssert(machine);
Thread.sleep(1100);
@@ -290,7 +290,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests {
ExtendedState extendedState = new DefaultExtendedState(variables);
DefaultStateMachineContext<States,Events> stateMachineContext = new DefaultStateMachineContext<States, Events>(States.S0, null, null, extendedState);
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext));
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block());
doStartAndAssert(machine);
assertThat((Integer)machine.getExtendedState().getVariables().get("count1")).isEqualTo(1);
@@ -321,7 +321,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests {
DefaultStateMachineContext<MyState, MyEvent> stateMachineContext = new DefaultStateMachineContext<MyState, MyEvent>(
SubState.SUB_NEXT, null, null, null);
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext));
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block());
doStartAndAssert(machine);
assertThat(machine.getState().getIds()).containsOnly(SuperState.PARENT, SubState.SUB_NEXT);
@@ -337,7 +337,7 @@ public class StateMachineResetTests extends AbstractStateMachineTests {
DefaultStateMachineContext<MyState, MyEvent> stateMachineContext = new DefaultStateMachineContext<MyState, MyEvent>(
SuperState.INITIAL, null, null, null);
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(stateMachineContext));
machine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(stateMachineContext).block());
doStartAndAssert(machine);
assertThat(machine.getState().getIds()).containsOnly(SuperState.INITIAL);

View File

@@ -24,6 +24,7 @@ import org.springframework.statemachine.support.DefaultStateMachineContext;
import org.springframework.statemachine.support.LifecycleObjectSupport;
import org.springframework.statemachine.support.StateMachineInterceptorAdapter;
import org.springframework.statemachine.transition.Transition;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Iterator;
@@ -63,7 +64,7 @@ public abstract class AbstractPersistStateMachineHandler<S, E> extends Lifecycle
stateMachine.stopReactively().block();
List<StateMachineAccess<S, E>> withAllRegions = stateMachine.getStateMachineAccessor().withAllRegions();
for (StateMachineAccess<S, E> a : withAllRegions) {
a.resetStateMachine(new DefaultStateMachineContext<S, E>(state, null, null, null));
a.resetStateMachineReactively(new DefaultStateMachineContext<S, E>(state, null, null, null)).block();
}
stateMachine.startReactively().block();
return stateMachine.sendEvent(event);
@@ -77,18 +78,18 @@ public abstract class AbstractPersistStateMachineHandler<S, E> extends Lifecycle
* @return mono for completion
*/
public Mono<Void> handleEventWithStateReactively(Message<E> event, S state) {
StateMachine<S, E> stateMachine = getInitStateMachine();
// TODO: REACTOR add docs and revisit this function concept
return Mono.from(stateMachine.stopReactively())
.then(Mono.fromRunnable(() -> {
List<StateMachineAccess<S, E>> withAllRegions = stateMachine.getStateMachineAccessor().withAllRegions();
for (StateMachineAccess<S, E> a : withAllRegions) {
a.resetStateMachine(new DefaultStateMachineContext<S, E>(state, null, null, null));
}
}))
.then(stateMachine.startReactively())
.thenMany(stateMachine.sendEvent(Mono.just(event)))
.then();
return Mono.defer(() -> {
StateMachine<S, E> stateMachine = getInitStateMachine();
// TODO: REACTOR add docs and revisit this function concept
return Mono.from(stateMachine.stopReactively())
.thenEmpty(
Flux.fromIterable(stateMachine.getStateMachineAccessor().withAllRegions())
.flatMap(region -> region.resetStateMachineReactively(new DefaultStateMachineContext<S, E>(state, null, null, null)))
)
.then(stateMachine.startReactively())
.thenMany(stateMachine.sendEvent(Mono.just(event)))
.then();
});
}
/**

View File

@@ -166,7 +166,7 @@ public class TasksHandler {
}
stateMachine.stopReactively().block();
stateMachine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachine(context));
stateMachine.getStateMachineAccessor().doWithAllRegions(function -> function.resetStateMachineReactively(context).block());
stateMachine.startReactively().block();
}