Change machine reset to reactive

- Re-implement resetStateMachineReactively as fully reactive
  chain by following same logic as in blocking version.
- New interface ReactiveStateMachineAccess as we need to
  know if one need to call reactive method.
- Fixes #911
This commit is contained in:
Janne Valkealahti
2021-03-12 14:08:20 +00:00
parent e56591c5e9
commit 8e613cc135
5 changed files with 254 additions and 122 deletions

View File

@@ -0,0 +1,40 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.statemachine.access;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.StateMachineContext;
import reactor.core.publisher.Mono;
/**
* Functional interface exposing reactive {@link StateMachine} internals.
*
* @author Janne Valkealahti
*
* @param <S> the type of state
* @param <E> the type of event
*/
public interface ReactiveStateMachineAccess<S, E> {
/**
* Reset state machine reactively.
*
* @param stateMachineContext the state machine context
* @return mono for completion
*/
Mono<Void> resetStateMachineReactively(StateMachineContext<S, E> stateMachineContext);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015 the original author or authors.
* Copyright 2015-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,7 +29,7 @@ import org.springframework.statemachine.support.StateMachineInterceptor;
* @param <S> the type of state
* @param <E> the type of event
*/
public interface StateMachineAccess<S, E> {
public interface StateMachineAccess<S, E> extends ReactiveStateMachineAccess<S, E> {
/**
* Sets the relay state machine.

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,7 +30,6 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.context.Lifecycle;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
@@ -710,147 +709,187 @@ public abstract class AbstractStateMachine<S, E> extends StateMachineObjectSuppo
return buf.toString();
}
@SuppressWarnings("rawtypes")
@Override
public void resetStateMachine(StateMachineContext<S, E> stateMachineContext) {
// TODO: this function needs a serious rewrite
if (stateMachineContext == null) {
log.info("Got null context, resetting to initial state, clearing extended state and machine id");
currentState = initialState;
extendedState.getVariables().clear();
setId(null);
return;
}
if (log.isDebugEnabled()) {
log.debug("Request to reset state machine: stateMachine=[" + this + "] stateMachineContext=[" + stateMachineContext + "]");
}
setId(stateMachineContext.getId());
S state = stateMachineContext.getState();
boolean stateSet = false;
// handle state reset
for (State<S, E> s : getStates()) {
for (State<S, E> ss : s.getStates()) {
resetStateMachineReactively(stateMachineContext).block();
}
boolean enumMatch = false;
if (state instanceof Enum && ss.getId() instanceof Enum && state.getClass() == ss.getId().getClass()
&& ((Enum) ss.getId()).ordinal() == ((Enum) state).ordinal()) {
enumMatch = true;
}
@SuppressWarnings("rawtypes")
@Override
public Mono<Void> resetStateMachineReactively(StateMachineContext<S, E> stateMachineContext) {
return Mono.defer(() -> {
if (stateMachineContext == null) {
log.info("Got null context, resetting to initial state, clearing extended state and machine id");
currentState = initialState;
extendedState.getVariables().clear();
setId(null);
return Mono.empty();
}
if (log.isDebugEnabled()) {
log.debug("Request to reset state machine: stateMachine=[" + this + "] stateMachineContext=[" + stateMachineContext + "]");
}
setId(stateMachineContext.getId());
S state = stateMachineContext.getState();
boolean stateSet = false;
if (state != null && (ss.getIds().contains(state) || enumMatch) ) {
currentState = s;
// setting lastState here is needed for restore
lastState = currentState;
// TODO: not sure about starting submachine/regions here, though
// needed if we only transit to super state or reset regions
if (s.isSubmachineState()) {
StateMachine<S, E> submachine = ((AbstractState<S, E>)s).getSubmachine();
List<Mono<Void>> monos = new ArrayList<>();
stateMachineContext.getChilds()
.forEach(child -> submachine.getStateMachineAccessor()
.doWithRegion(function -> function.resetStateMachine(child)));
for (State<S, E> s : getStates()) {
for (State<S, E> ss : s.getStates()) {
boolean enumMatch = false;
if (state instanceof Enum && ss.getId() instanceof Enum && state.getClass() == ss.getId().getClass()
&& ((Enum) ss.getId()).ordinal() == ((Enum) state).ordinal()) {
enumMatch = true;
}
} else if (s.isOrthogonal() && stateMachineContext.getChilds() != null) {
Collection<Region<S, E>> regions = ((AbstractState<S, E>)s).getRegions();
for (Region<S, E> region : regions) {
for (final StateMachineContext<S, E> child : stateMachineContext.getChilds()) {
((StateMachine<S, E>)region).getStateMachineAccessor()
.doWithRegion(function -> function.resetStateMachine(child));
}
if (state != null && (ss.getIds().contains(state) || enumMatch) ) {
Mono<Void> mono = Mono.fromRunnable(() -> {
currentState = s;
// setting lastState here is needed for restore
lastState = currentState;
});
if (s.isSubmachineState()) {
StateMachine<S, E> submachine = ((AbstractState<S, E>)s).getSubmachine();
Mono<Void> resetMono = Flux.fromIterable(stateMachineContext.getChilds())
.map(child -> submachine.getStateMachineAccessor())
.flatMap(region -> region.withRegion().resetStateMachineReactively(stateMachineContext))
.then();
mono = mono.then(resetMono);
} else if (s.isOrthogonal() && stateMachineContext.getChilds() != null) {
Collection<Region<S, E>> regions = ((AbstractState<S, E>)s).getRegions();
Mono<Void> resetMono = Flux.fromIterable(regions)
.flatMap(region -> {
return Flux.fromIterable(stateMachineContext.getChilds())
.flatMap(child -> {
return Mono.fromRunnable(() -> {
((StateMachine<S, E>)region).getStateMachineAccessor()
.doWithRegion(function -> function.resetStateMachine(child));
});
})
.then();
})
.then();
mono = mono.then(resetMono);
}
}
if (log.isDebugEnabled()) {
log.debug("State reseted: stateMachine=[" + this + "] stateMachineContext=[" + stateMachineContext + "]");
}
stateSet = true;
break;
} else if (stateMachineContext.getChilds() != null && !stateMachineContext.getChilds().isEmpty()) {
// we're here because root machine only have regions
if (s.isOrthogonal()) {
Collection<Region<S, E>> regions = ((AbstractState<S, E>)s).getRegions();
for (Region<S, E> region : regions) {
if (log.isDebugEnabled()) {
log.debug("State reseted: stateMachine=[" + this + "] stateMachineContext=[" + stateMachineContext + "]");
}
monos.add(mono);
stateSet = true;
break;
} else if (stateMachineContext.getChilds() != null && !stateMachineContext.getChilds().isEmpty()) {
if (s.isOrthogonal()) {
Collection<Region<S, E>> regions = ((AbstractState<S, E>)s).getRegions();
Mono<Void> resetMono = Flux.fromIterable(regions)
.flatMap(region -> {
return Flux.fromIterable(stateMachineContext.getChilds())
.flatMap(child -> {
return Mono.fromRunnable(() -> {
if (ObjectUtils.nullSafeEquals(region.getId(), child.getId())) {
((StateMachine<S, E>)region).getStateMachineAccessor()
.doWithRegion(function -> function.resetStateMachine(child));
}
});
})
.then();
})
.then();
monos.add(resetMono);
} else {
Mono<Void> mono = Mono.empty();
for (final StateMachineContext<S, E> child : stateMachineContext.getChilds()) {
// only call if reqion id matches with context id
if (ObjectUtils.nullSafeEquals(region.getId(), child.getId())) {
((StateMachine<S, E>)region).getStateMachineAccessor()
.doWithRegion(function -> function.resetStateMachine(child));
S state2 = child.getState();
boolean enumMatch2 = false;
if (state2 instanceof Enum && ss.getId() instanceof Enum
&& state.getClass() == ss.getId().getClass()
&& ((Enum) ss.getId()).ordinal() == ((Enum) state2).ordinal()) {
enumMatch2 = true;
}
if (state2 != null && (ss.getIds().contains(state2) || enumMatch2) ) {
mono = Mono.fromRunnable(() -> {
currentState = s;
lastState = currentState;
});
stateSet = true;
break;
}
}
}
} else {
for (final StateMachineContext<S, E> child : stateMachineContext.getChilds()) {
S state2 = child.getState();
boolean enumMatch2 = false;
if (state2 instanceof Enum && ss.getId() instanceof Enum
&& state.getClass() == ss.getId().getClass()
&& ((Enum) ss.getId()).ordinal() == ((Enum) state2).ordinal()) {
enumMatch2 = true;
}
if (state2 != null && (ss.getIds().contains(state2) || enumMatch2) ) {
currentState = s;
lastState = currentState;
stateSet = true;
break;
}
monos.add(mono);
}
}
}
}
if (stateSet) {
break;
}
}
// handle history reset here as above state reset loop breaks out
if (history != null && stateMachineContext.getHistoryStates() != null) {
// setting history for 'this' machine
State<S, E> h = null;
for (State<S, E> hh : getStates()) {
if (hh.getId().equals(stateMachineContext.getHistoryStates().get(null))) {
h = hh;
if (stateSet) {
break;
}
}
if (h != null) {
((HistoryPseudoState<S, E>) history).setState(h);
}
}
for (State<S, E> s : getStates()) {
if (StateMachineUtils.isPseudoState(s, PseudoStateKind.JOIN)) {
JoinPseudoState<S, E> jps = (JoinPseudoState<S, E>) s.getPseudoState();
Collection<S> ids = currentState.getIds();
jps.reset(ids);
}
// setting history for 'submachines'
if (s.isSubmachineState()) {
StateMachine<S, E> submachine = ((AbstractState<S, E>) s).getSubmachine();
PseudoState<S, E> submachineHistory = ((AbstractStateMachine<S, E>) submachine).getHistoryState();
if (submachineHistory != null) {
if (history != null && stateMachineContext.getHistoryStates() != null) {
Mono<Void> mono = Mono.fromRunnable(() -> {
// setting history for 'this' machine
State<S, E> h = null;
for (State<S, E> hh : submachine.getStates()) {
if (hh.getId().equals(stateMachineContext.getHistoryStates().get(s.getId()))) {
for (State<S, E> hh : getStates()) {
if (hh.getId().equals(stateMachineContext.getHistoryStates().get(null))) {
h = hh;
break;
}
}
if (h != null) {
((HistoryPseudoState<S, E>) submachineHistory).setState(h);
((HistoryPseudoState<S, E>) history).setState(h);
}
}
});
monos.add(mono);
}
}
if (stateSet && stateMachineContext.getExtendedState() != null) {
this.extendedState.getVariables().clear();
this.extendedState.getVariables().putAll(stateMachineContext.getExtendedState().getVariables());
}
if (currentState instanceof Lifecycle) {
((Lifecycle)currentState).start();
}
for (State<S, E> s : getStates()) {
Mono<Void> mono = Mono.fromRunnable(() -> {
if (StateMachineUtils.isPseudoState(s, PseudoStateKind.JOIN)) {
JoinPseudoState<S, E> jps = (JoinPseudoState<S, E>) s.getPseudoState();
Collection<S> ids = currentState.getIds();
jps.reset(ids);
}
// setting history for 'submachines'
if (s.isSubmachineState()) {
StateMachine<S, E> submachine = ((AbstractState<S, E>) s).getSubmachine();
PseudoState<S, E> submachineHistory = ((AbstractStateMachine<S, E>) submachine).getHistoryState();
if (submachineHistory != null) {
State<S, E> h = null;
for (State<S, E> hh : submachine.getStates()) {
if (hh.getId().equals(stateMachineContext.getHistoryStates().get(s.getId()))) {
h = hh;
break;
}
}
if (h != null) {
((HistoryPseudoState<S, E>) submachineHistory).setState(h);
}
}
}
});
monos.add(mono);
}
if (stateSet && stateMachineContext.getExtendedState() != null) {
Mono<Void> mono = Mono.fromRunnable(() -> {
this.extendedState.getVariables().clear();
this.extendedState.getVariables().putAll(stateMachineContext.getExtendedState().getVariables());
});
monos.add(mono);
}
return Flux.concat(monos).then();
})
.thenEmpty(Mono.defer(() -> {
if (currentState instanceof StateMachineReactiveLifecycle) {
return ((StateMachineReactiveLifecycle) currentState).startReactively();
}
return Mono.empty();
}));
}
@Override

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
@@ -46,6 +47,11 @@ import org.springframework.statemachine.listener.StateMachineListenerAdapter;
import org.springframework.statemachine.support.DefaultExtendedState;
import org.springframework.statemachine.support.DefaultStateMachineContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
/**
* Tests for resetting a state machine state and extended variables using a
* {@link StateMachineContext}.
@@ -777,4 +783,46 @@ public class StateMachineResetTests extends AbstractStateMachineTests {
public enum MyEvent {
GO;
}}
}
@Test
public void testResetError() {
context.register(Config7.class);
context.refresh();
StateMachine<TestStates, TestEvents> machine = resolveMachine(context);
DefaultStateMachineContext<TestStates, TestEvents> stateMachineContext =
new DefaultStateMachineContext<TestStates, TestEvents>(TestStates.S1, null, null, null);
Stream<Mono<Void>> monos = machine.getStateMachineAccessor().withAllRegions().stream()
.map(a -> a.resetStateMachineReactively(stateMachineContext));
Mono<Void> resetMono = Flux.fromStream(monos).flatMap(m -> m).next().publishOn(Schedulers.single());
StepVerifier.create(resetMono).expectComplete().verify();
doStartAndAssert(machine);
assertThat(machine.getState().getIds()).containsOnly(TestStates.S1);
}
@Configuration
@EnableStateMachine
static class Config7 extends EnumStateMachineConfigurerAdapter<TestStates, TestEvents> {
@Override
public void configure(StateMachineStateConfigurer<TestStates, TestEvents> states) throws Exception {
states
.withStates()
.initial(TestStates.SI)
.state(TestStates.S1);
}
@Override
public void configure(StateMachineTransitionConfigurer<TestStates, TestEvents> transitions) throws Exception {
transitions
.withExternal()
.source(TestStates.SI)
.target(TestStates.S1)
.event(TestEvents.E1);
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -110,6 +110,11 @@ public class StateMachineAccessTests {
public void resetStateMachine(StateMachineContext<String, String> stateMachineContext) {
}
@Override
public Mono<Void> resetStateMachineReactively(StateMachineContext<String, String> stateMachineContext) {
return Mono.empty();
}
@Override
public Mono<Void> startReactively() {
return null;