From 8e613cc135bcfaf2a975f5b092daa5c8b8ddd397 Mon Sep 17 00:00:00 2001 From: Janne Valkealahti Date: Fri, 12 Mar 2021 14:08:20 +0000 Subject: [PATCH] Change machine reset to reactive - Re-implement resetStateMachineReactively as fully reactive chain by following same logic as in blocking version. - New interface ReactiveStateMachineAccess as we need to know if one need to call reactive method. - Fixes #911 --- .../access/ReactiveStateMachineAccess.java | 40 +++ .../access/StateMachineAccess.java | 4 +- .../support/AbstractStateMachine.java | 273 ++++++++++-------- .../statemachine/StateMachineResetTests.java | 52 +++- .../access/StateMachineAccessTests.java | 7 +- 5 files changed, 254 insertions(+), 122 deletions(-) create mode 100644 spring-statemachine-core/src/main/java/org/springframework/statemachine/access/ReactiveStateMachineAccess.java diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/access/ReactiveStateMachineAccess.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/access/ReactiveStateMachineAccess.java new file mode 100644 index 00000000..9636ae60 --- /dev/null +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/access/ReactiveStateMachineAccess.java @@ -0,0 +1,40 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.statemachine.access; + +import org.springframework.statemachine.StateMachine; +import org.springframework.statemachine.StateMachineContext; + +import reactor.core.publisher.Mono; + +/** + * Functional interface exposing reactive {@link StateMachine} internals. + * + * @author Janne Valkealahti + * + * @param the type of state + * @param the type of event + */ +public interface ReactiveStateMachineAccess { + + /** + * Reset state machine reactively. + * + * @param stateMachineContext the state machine context + * @return mono for completion + */ + Mono resetStateMachineReactively(StateMachineContext stateMachineContext); +} diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/access/StateMachineAccess.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/access/StateMachineAccess.java index 06de4736..01770177 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/access/StateMachineAccess.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/access/StateMachineAccess.java @@ -1,5 +1,5 @@ /* - * Copyright 2015 the original author or authors. + * Copyright 2015-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,7 +29,7 @@ import org.springframework.statemachine.support.StateMachineInterceptor; * @param the type of state * @param the type of event */ -public interface StateMachineAccess { +public interface StateMachineAccess extends ReactiveStateMachineAccess { /** * Sets the relay state machine. diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/AbstractStateMachine.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/AbstractStateMachine.java index f223c8d0..94b89c59 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/AbstractStateMachine.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/AbstractStateMachine.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 the original author or authors. + * Copyright 2015-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,7 +30,6 @@ import org.apache.commons.logging.LogFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; -import org.springframework.context.Lifecycle; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; @@ -710,147 +709,187 @@ public abstract class AbstractStateMachine extends StateMachineObjectSuppo return buf.toString(); } - @SuppressWarnings("rawtypes") @Override public void resetStateMachine(StateMachineContext stateMachineContext) { - // TODO: this function needs a serious rewrite - if (stateMachineContext == null) { - log.info("Got null context, resetting to initial state, clearing extended state and machine id"); - currentState = initialState; - extendedState.getVariables().clear(); - setId(null); - return; - } - if (log.isDebugEnabled()) { - log.debug("Request to reset state machine: stateMachine=[" + this + "] stateMachineContext=[" + stateMachineContext + "]"); - } - setId(stateMachineContext.getId()); - S state = stateMachineContext.getState(); - boolean stateSet = false; - // handle state reset - for (State s : getStates()) { - for (State ss : s.getStates()) { + resetStateMachineReactively(stateMachineContext).block(); + } - boolean enumMatch = false; - if (state instanceof Enum && ss.getId() instanceof Enum && state.getClass() == ss.getId().getClass() - && ((Enum) ss.getId()).ordinal() == ((Enum) state).ordinal()) { - enumMatch = true; - } + @SuppressWarnings("rawtypes") + @Override + public Mono resetStateMachineReactively(StateMachineContext stateMachineContext) { + return Mono.defer(() -> { + if (stateMachineContext == null) { + log.info("Got null context, resetting to initial state, clearing extended state and machine id"); + currentState = initialState; + extendedState.getVariables().clear(); + setId(null); + return Mono.empty(); + } + if (log.isDebugEnabled()) { + log.debug("Request to reset state machine: stateMachine=[" + this + "] stateMachineContext=[" + stateMachineContext + "]"); + } + setId(stateMachineContext.getId()); + S state = stateMachineContext.getState(); + boolean stateSet = false; - if (state != null && (ss.getIds().contains(state) || enumMatch) ) { - currentState = s; - // setting lastState here is needed for restore - lastState = currentState; - // TODO: not sure about starting submachine/regions here, though - // needed if we only transit to super state or reset regions - if (s.isSubmachineState()) { - StateMachine submachine = ((AbstractState)s).getSubmachine(); + List> monos = new ArrayList<>(); - stateMachineContext.getChilds() - .forEach(child -> submachine.getStateMachineAccessor() - .doWithRegion(function -> function.resetStateMachine(child))); + for (State s : getStates()) { + for (State ss : s.getStates()) { + boolean enumMatch = false; + if (state instanceof Enum && ss.getId() instanceof Enum && state.getClass() == ss.getId().getClass() + && ((Enum) ss.getId()).ordinal() == ((Enum) state).ordinal()) { + enumMatch = true; + } - } else if (s.isOrthogonal() && stateMachineContext.getChilds() != null) { - Collection> regions = ((AbstractState)s).getRegions(); - for (Region region : regions) { - for (final StateMachineContext child : stateMachineContext.getChilds()) { - ((StateMachine)region).getStateMachineAccessor() - .doWithRegion(function -> function.resetStateMachine(child)); - } + if (state != null && (ss.getIds().contains(state) || enumMatch) ) { + + Mono mono = Mono.fromRunnable(() -> { + currentState = s; + // setting lastState here is needed for restore + lastState = currentState; + }); + + if (s.isSubmachineState()) { + StateMachine submachine = ((AbstractState)s).getSubmachine(); + Mono resetMono = Flux.fromIterable(stateMachineContext.getChilds()) + .map(child -> submachine.getStateMachineAccessor()) + .flatMap(region -> region.withRegion().resetStateMachineReactively(stateMachineContext)) + .then(); + mono = mono.then(resetMono); + } else if (s.isOrthogonal() && stateMachineContext.getChilds() != null) { + Collection> regions = ((AbstractState)s).getRegions(); + Mono resetMono = Flux.fromIterable(regions) + .flatMap(region -> { + return Flux.fromIterable(stateMachineContext.getChilds()) + .flatMap(child -> { + return Mono.fromRunnable(() -> { + ((StateMachine)region).getStateMachineAccessor() + .doWithRegion(function -> function.resetStateMachine(child)); + }); + }) + .then(); + }) + .then(); + mono = mono.then(resetMono); } - } - if (log.isDebugEnabled()) { - log.debug("State reseted: stateMachine=[" + this + "] stateMachineContext=[" + stateMachineContext + "]"); - } - stateSet = true; - break; - } else if (stateMachineContext.getChilds() != null && !stateMachineContext.getChilds().isEmpty()) { - // we're here because root machine only have regions - if (s.isOrthogonal()) { - Collection> regions = ((AbstractState)s).getRegions(); - - for (Region region : regions) { + if (log.isDebugEnabled()) { + log.debug("State reseted: stateMachine=[" + this + "] stateMachineContext=[" + stateMachineContext + "]"); + } + monos.add(mono); + stateSet = true; + break; + } else if (stateMachineContext.getChilds() != null && !stateMachineContext.getChilds().isEmpty()) { + if (s.isOrthogonal()) { + Collection> regions = ((AbstractState)s).getRegions(); + Mono resetMono = Flux.fromIterable(regions) + .flatMap(region -> { + return Flux.fromIterable(stateMachineContext.getChilds()) + .flatMap(child -> { + return Mono.fromRunnable(() -> { + if (ObjectUtils.nullSafeEquals(region.getId(), child.getId())) { + ((StateMachine)region).getStateMachineAccessor() + .doWithRegion(function -> function.resetStateMachine(child)); + } + }); + }) + .then(); + }) + .then(); + monos.add(resetMono); + } else { + Mono mono = Mono.empty(); for (final StateMachineContext child : stateMachineContext.getChilds()) { - // only call if reqion id matches with context id - if (ObjectUtils.nullSafeEquals(region.getId(), child.getId())) { - ((StateMachine)region).getStateMachineAccessor() - .doWithRegion(function -> function.resetStateMachine(child)); + S state2 = child.getState(); + boolean enumMatch2 = false; + if (state2 instanceof Enum && ss.getId() instanceof Enum + && state.getClass() == ss.getId().getClass() + && ((Enum) ss.getId()).ordinal() == ((Enum) state2).ordinal()) { + enumMatch2 = true; + } + + if (state2 != null && (ss.getIds().contains(state2) || enumMatch2) ) { + mono = Mono.fromRunnable(() -> { + currentState = s; + lastState = currentState; + }); + stateSet = true; + break; } } - } - } else { - for (final StateMachineContext child : stateMachineContext.getChilds()) { - S state2 = child.getState(); - boolean enumMatch2 = false; - if (state2 instanceof Enum && ss.getId() instanceof Enum - && state.getClass() == ss.getId().getClass() - && ((Enum) ss.getId()).ordinal() == ((Enum) state2).ordinal()) { - enumMatch2 = true; - } - - if (state2 != null && (ss.getIds().contains(state2) || enumMatch2) ) { - currentState = s; - lastState = currentState; - stateSet = true; - break; - } + monos.add(mono); } } } - } - if (stateSet) { - break; - } - } - - // handle history reset here as above state reset loop breaks out - if (history != null && stateMachineContext.getHistoryStates() != null) { - // setting history for 'this' machine - State h = null; - for (State hh : getStates()) { - if (hh.getId().equals(stateMachineContext.getHistoryStates().get(null))) { - h = hh; + if (stateSet) { break; } } - if (h != null) { - ((HistoryPseudoState) history).setState(h); - } - } - for (State s : getStates()) { - if (StateMachineUtils.isPseudoState(s, PseudoStateKind.JOIN)) { - JoinPseudoState jps = (JoinPseudoState) s.getPseudoState(); - Collection ids = currentState.getIds(); - jps.reset(ids); - } - // setting history for 'submachines' - if (s.isSubmachineState()) { - StateMachine submachine = ((AbstractState) s).getSubmachine(); - PseudoState submachineHistory = ((AbstractStateMachine) submachine).getHistoryState(); - if (submachineHistory != null) { + if (history != null && stateMachineContext.getHistoryStates() != null) { + Mono mono = Mono.fromRunnable(() -> { + // setting history for 'this' machine State h = null; - for (State hh : submachine.getStates()) { - if (hh.getId().equals(stateMachineContext.getHistoryStates().get(s.getId()))) { + for (State hh : getStates()) { + if (hh.getId().equals(stateMachineContext.getHistoryStates().get(null))) { h = hh; break; } } if (h != null) { - ((HistoryPseudoState) submachineHistory).setState(h); + ((HistoryPseudoState) history).setState(h); } - } - + }); + monos.add(mono); } - } - if (stateSet && stateMachineContext.getExtendedState() != null) { - this.extendedState.getVariables().clear(); - this.extendedState.getVariables().putAll(stateMachineContext.getExtendedState().getVariables()); - } - if (currentState instanceof Lifecycle) { - ((Lifecycle)currentState).start(); - } + + for (State s : getStates()) { + Mono mono = Mono.fromRunnable(() -> { + if (StateMachineUtils.isPseudoState(s, PseudoStateKind.JOIN)) { + JoinPseudoState jps = (JoinPseudoState) s.getPseudoState(); + Collection ids = currentState.getIds(); + jps.reset(ids); + } + + // setting history for 'submachines' + if (s.isSubmachineState()) { + StateMachine submachine = ((AbstractState) s).getSubmachine(); + PseudoState submachineHistory = ((AbstractStateMachine) submachine).getHistoryState(); + if (submachineHistory != null) { + State h = null; + for (State hh : submachine.getStates()) { + if (hh.getId().equals(stateMachineContext.getHistoryStates().get(s.getId()))) { + h = hh; + break; + } + } + if (h != null) { + ((HistoryPseudoState) submachineHistory).setState(h); + } + } + + } + }); + monos.add(mono); + } + + if (stateSet && stateMachineContext.getExtendedState() != null) { + Mono mono = Mono.fromRunnable(() -> { + this.extendedState.getVariables().clear(); + this.extendedState.getVariables().putAll(stateMachineContext.getExtendedState().getVariables()); + }); + monos.add(mono); + } + + return Flux.concat(monos).then(); + }) + .thenEmpty(Mono.defer(() -> { + if (currentState instanceof StateMachineReactiveLifecycle) { + return ((StateMachineReactiveLifecycle) currentState).startReactively(); + } + return Mono.empty(); + })); } @Override diff --git a/spring-statemachine-core/src/test/java/org/springframework/statemachine/StateMachineResetTests.java b/spring-statemachine-core/src/test/java/org/springframework/statemachine/StateMachineResetTests.java index 185f552f..4655057e 100644 --- a/spring-statemachine-core/src/test/java/org/springframework/statemachine/StateMachineResetTests.java +++ b/spring-statemachine-core/src/test/java/org/springframework/statemachine/StateMachineResetTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 the original author or authors. + * Copyright 2015-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Stream; import org.junit.jupiter.api.Test; import org.springframework.context.annotation.AnnotationConfigApplicationContext; @@ -46,6 +47,11 @@ import org.springframework.statemachine.listener.StateMachineListenerAdapter; import org.springframework.statemachine.support.DefaultExtendedState; import org.springframework.statemachine.support.DefaultStateMachineContext; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.test.StepVerifier; + /** * Tests for resetting a state machine state and extended variables using a * {@link StateMachineContext}. @@ -777,4 +783,46 @@ public class StateMachineResetTests extends AbstractStateMachineTests { public enum MyEvent { GO; - }} + } + + @Test + public void testResetError() { + context.register(Config7.class); + context.refresh(); + StateMachine machine = resolveMachine(context); + + DefaultStateMachineContext stateMachineContext = + new DefaultStateMachineContext(TestStates.S1, null, null, null); + + Stream> monos = machine.getStateMachineAccessor().withAllRegions().stream() + .map(a -> a.resetStateMachineReactively(stateMachineContext)); + Mono resetMono = Flux.fromStream(monos).flatMap(m -> m).next().publishOn(Schedulers.single()); + StepVerifier.create(resetMono).expectComplete().verify(); + + doStartAndAssert(machine); + assertThat(machine.getState().getIds()).containsOnly(TestStates.S1); + } + + @Configuration + @EnableStateMachine + static class Config7 extends EnumStateMachineConfigurerAdapter { + + @Override + public void configure(StateMachineStateConfigurer states) throws Exception { + states + .withStates() + .initial(TestStates.SI) + .state(TestStates.S1); + } + + @Override + public void configure(StateMachineTransitionConfigurer transitions) throws Exception { + transitions + .withExternal() + .source(TestStates.SI) + .target(TestStates.S1) + .event(TestEvents.E1); + } + + } +} diff --git a/spring-statemachine-core/src/test/java/org/springframework/statemachine/access/StateMachineAccessTests.java b/spring-statemachine-core/src/test/java/org/springframework/statemachine/access/StateMachineAccessTests.java index 08b9ec59..2e8e2728 100644 --- a/spring-statemachine-core/src/test/java/org/springframework/statemachine/access/StateMachineAccessTests.java +++ b/spring-statemachine-core/src/test/java/org/springframework/statemachine/access/StateMachineAccessTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 the original author or authors. + * Copyright 2015-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -110,6 +110,11 @@ public class StateMachineAccessTests { public void resetStateMachine(StateMachineContext stateMachineContext) { } + @Override + public Mono resetStateMachineReactively(StateMachineContext stateMachineContext) { + return Mono.empty(); + } + @Override public Mono startReactively() { return null;