Fix reactive event handling

- Before attempting to add better level of error handling,
  discard all errors what might come from action execution.
- Handle some cases where event deny is reported back as accepted.
- Fix reactor error for double subcription.
- Relates #821
This commit is contained in:
Janne Valkealahti
2020-09-09 19:08:07 +01:00
parent 34a28b57fa
commit 706ae5511f
8 changed files with 559 additions and 478 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -466,9 +466,9 @@ public abstract class AbstractState<S, E> extends LifecycleObjectSupport impleme
final AtomicInteger completionCount = new AtomicInteger(stateActions.size());
Long timeout = resolveDoActionTimeout(context);
return Flux.fromIterable(stateActions)
.map(stateAction -> executeAction(stateAction, context))
.map(function -> {
return function
.doOnNext(stateAction -> {
executeAction(stateAction, context)
.onErrorResume(t -> Mono.empty())
.subscribeOn(Schedulers.parallel())
.doOnSubscribe(subscription -> {
if (log.isDebugEnabled()) {
@@ -479,13 +479,13 @@ public abstract class AbstractState<S, E> extends LifecycleObjectSupport impleme
.then(handleCompleteOrEmpty1(context, completionCount))
.subscribe();
})
.then(handleCompleteOrEmpty2(context, completionCount))
;
.then(handleCompleteOrEmpty2(context, completionCount));
});
}
private Mono<Void> handleCompleteOrEmpty1(StateContext<S, E> context, AtomicInteger completionCount) {
return Mono.defer(() -> {
log.debug("handleCompleteOrEmpty1 " + completionCount + " " + stateActions);
if (completionCount.decrementAndGet() <= 0 && stateActions.size() > 0) {
return handleStateDoOnComplete(context)
.then(Mono.fromRunnable(() -> notifyStateOnComplete(context)));

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -146,9 +146,7 @@ public class ObjectState<S, E> extends AbstractSimpleState<S, E> {
public Mono<Void> exit(StateContext<S, E> context) {
Mono<Void> actions = Flux.fromIterable(getExitActions())
.flatMap(a -> executeAction(a, context))
.onErrorContinue((t, u) -> {
// TODO: REACTOR allow continue and fix with error handling overhaul
})
.onErrorResume(t -> Mono.empty())
.then();
return super.exit(context).and(actions);
}
@@ -157,9 +155,7 @@ public class ObjectState<S, E> extends AbstractSimpleState<S, E> {
public Mono<Void> entry(StateContext<S, E> context) {
Mono<Void> actions = Flux.fromIterable(getEntryActions())
.flatMap(a -> executeAction(a, context))
.onErrorContinue((t, u) -> {
// TODO: REACTOR allow continue and fix with error handling overhaul
})
.onErrorResume(t -> Mono.empty())
.then();
return actions.and(super.entry(context));
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -619,6 +619,7 @@ public abstract class AbstractStateMachine<S, E> extends StateMachineObjectSuppo
}
return Mono.just(message)
.map(m -> getStateMachineInterceptors().preEvent(m, this))
// .onErrorResume(error -> Mono.empty())
.onErrorResume(error -> Mono.empty())
.flatMapMany(m -> acceptEvent(m))
.doOnNext(notifyOnDenied());
@@ -642,29 +643,41 @@ public abstract class AbstractStateMachine<S, E> extends StateMachineObjectSuppo
stateMachineExecutor.queueDeferredEvent(message);
return Flux.just(StateMachineEventResult.<S, E>from(this, message, ResultType.DEFERRED));
}
return cs.sendEvent(message).collectList().flatMapMany(l -> {
Flux<StateMachineEventResult<S, E>> ret = Flux.fromIterable(l);
if (!l.stream().anyMatch(er -> er.getResultType() == ResultType.ACCEPTED)) {
Mono<StateMachineEventResult<S, E>> result = Flux.fromIterable(transitions)
.filter(transition -> cs != null && transition.getTrigger() != null)
.filter(transition -> StateMachineUtils.containsAtleastOne(transition.getSource().getIds(), cs.getIds()))
.flatMap(transition -> {
return Mono.from(transition.getTrigger().evaluate(triggerContext))
.flatMap(e -> {
if (e) {
return stateMachineExecutor.queueEvent(Mono.just(message))
.thenReturn(StateMachineEventResult.<S, E>from(this, message, ResultType.ACCEPTED));
} else {
return Mono.empty();
}
});
})
.next()
.switchIfEmpty(Mono.just(StateMachineEventResult.<S, E>from(this, message, ResultType.DENIED)));
ret = ret.concatWith(result);
}
return ret;
});
return cs.sendEvent(message)
.collectList()
.flatMapMany(l -> {
Flux<StateMachineEventResult<S, E>> ret = Flux.fromIterable(l);
if (!l.stream().anyMatch(er -> er.getResultType() == ResultType.ACCEPTED)) {
Mono<StateMachineEventResult<S, E>> result = Flux.fromIterable(transitions)
.filter(transition -> cs != null && transition.getTrigger() != null)
.filter(transition -> StateMachineUtils.containsAtleastOne(transition.getSource().getIds(), cs.getIds()))
.flatMap(transition -> {
return Mono.from(transition.getTrigger().evaluate(triggerContext))
.flatMap(e -> {
if (e) {
return stateMachineExecutor.queueEvent(Mono.just(message))
.then(Mono.defer(() -> {
return Mono.just(StateMachineEventResult.<S, E>from(this, message, ResultType.ACCEPTED));
}))
.onErrorResume(t -> {
return Mono.defer(() -> {
return Mono.just(StateMachineEventResult.<S, E>from(this, message, ResultType.DENIED));
});
});
} else {
return Mono.empty();
}
});
})
.next()
.switchIfEmpty(Mono.defer(() -> {
return Mono.just(StateMachineEventResult.<S, E>from(this, message, ResultType.DENIED));
}));
ret = ret.concatWith(result);
}
return ret;
});
}
return Flux.just(StateMachineEventResult.<S, E>from(this, message, ResultType.DENIED));
});

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -36,6 +36,7 @@ import org.springframework.messaging.MessageHeaders;
import org.springframework.statemachine.StateContext;
import org.springframework.statemachine.StateContext.Stage;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.StateMachineException;
import org.springframework.statemachine.StateMachineSystemConstants;
import org.springframework.statemachine.state.JoinPseudoState;
import org.springframework.statemachine.state.PseudoStateKind;
@@ -200,7 +201,11 @@ public class ReactiveStateMachineExecutor<S, E> extends LifecycleObjectSupport i
return messages
.flatMap(m -> handleEvent(m))
.doOnNext(i -> {
triggerSink.next(i);
try {
triggerSink.next(i);
} catch (Exception e) {
throw new StateMachineException("Unable to handle queued event", e);
}
})
.then();
}
@@ -325,8 +330,8 @@ public class ReactiveStateMachineExecutor<S, E> extends LifecycleObjectSupport i
}
private Mono<Boolean> handleTriggerTrans(List<Transition<S, E>> trans, Message<E> queuedMessage, State<S, E> completion) {
return Flux.fromIterable(trans)
.filter(t -> {
return Flux.fromIterable(trans)
.filter(t -> {
State<S,E> source = t.getSource();
if (source == null) {
return false;
@@ -368,8 +373,7 @@ public class ReactiveStateMachineExecutor<S, E> extends LifecycleObjectSupport i
.doFinally(s -> {
joinSyncTransitions.clear();
})
.then(Mono.just(true))
;
.then(Mono.just(true));
} else {
return Mono.just(false);
}
@@ -388,16 +392,16 @@ public class ReactiveStateMachineExecutor<S, E> extends LifecycleObjectSupport i
.onErrorResume(e -> {
interceptors.postTransition(stateContext);
return Mono.just(false);
});
})
;
} else {
return Mono.just(false);
}
})
)
.onErrorResume(e -> Mono.just(false));
);
}
})
.takeUntil(x -> x)
.takeUntil(transit -> transit)
.last(false);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,9 +15,7 @@
*/
package org.springframework.statemachine;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -30,6 +28,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.statemachine.config.EnableStateMachine;
import org.springframework.statemachine.config.EnumStateMachineConfigurerAdapter;
import org.springframework.statemachine.config.StateMachineConfigurerAdapter;
import org.springframework.statemachine.config.builders.StateMachineStateConfigurer;
import org.springframework.statemachine.config.builders.StateMachineTransitionConfigurer;
import org.springframework.statemachine.event.OnStateMachineError;
@@ -48,425 +47,464 @@ import org.springframework.statemachine.transition.Transition;
*/
public class StateMachineErrorTests extends AbstractStateMachineTests {
@Override
protected AnnotationConfigApplicationContext buildContext() {
return new AnnotationConfigApplicationContext();
}
@Test
public void testEvents() throws Exception {
context.register(EventListenerConfig1.class, Config1.class);
context.refresh();
TestApplicationEventListener1 listener1 = context.getBean(TestApplicationEventListener1.class);
TestApplicationEventListener2 listener3 = context.getBean(TestApplicationEventListener2.class);
@SuppressWarnings("unchecked")
ObjectStateMachine<TestStates,TestEvents> machine =
context.getBean(StateMachineSystemConstants.DEFAULT_ID_STATEMACHINE, ObjectStateMachine.class);
assertThat(machine.hasStateMachineError(), is(false));
TestStateMachineListener listener2 = new TestStateMachineListener();
machine.addStateListener(listener2);
machine.start();
machine.setStateMachineError(new RuntimeException("myerror"));
assertThat(listener1.latch.await(1, TimeUnit.SECONDS), is(true));
assertThat(listener1.count, is(1));
assertThat(listener3.latch.await(1, TimeUnit.SECONDS), is(true));
assertThat(listener3.count, is(1));
assertThat(listener2.latch.await(1, TimeUnit.SECONDS), is(true));
assertThat(listener2.count, is(1));
assertThat(machine.hasStateMachineError(), is(true));
}
@Test
public void testInterceptHandlesError() throws Exception {
context.register(EventListenerConfig1.class, Config1.class);
context.refresh();
TestApplicationEventListener1 listener1 = context.getBean(TestApplicationEventListener1.class);
@SuppressWarnings("unchecked")
ObjectStateMachine<TestStates,TestEvents> machine =
context.getBean(StateMachineSystemConstants.DEFAULT_ID_STATEMACHINE, ObjectStateMachine.class);
assertThat(machine.hasStateMachineError(), is(false));
machine.getStateMachineAccessor().doWithRegion(
function -> function.addStateMachineInterceptor(new StateMachineInterceptorAdapter<TestStates,TestEvents>() {
@Override
public Exception stateMachineError(StateMachine<TestStates, TestEvents> stateMachine,
Exception exception) {
return null;
}
}));
TestStateMachineListener listener2 = new TestStateMachineListener();
machine.addStateListener(listener2);
machine.start();
machine.setStateMachineError(new RuntimeException("myerror"));
assertThat(listener1.latch.await(1, TimeUnit.SECONDS), is(false));
assertThat(listener1.count, is(0));
assertThat(listener2.latch.await(1, TimeUnit.SECONDS), is(false));
assertThat(listener2.count, is(0));
assertThat(machine.hasStateMachineError(), is(false));
}
@Test
public void testErrorActive() throws Exception {
context.register(Config1.class);
context.refresh();
@SuppressWarnings("unchecked")
ObjectStateMachine<TestStates,TestEvents> machine =
context.getBean(StateMachineSystemConstants.DEFAULT_ID_STATEMACHINE, ObjectStateMachine.class);
assertThat(machine.hasStateMachineError(), is(false));
machine.start();
machine.setStateMachineError(new RuntimeException("myerror"));
assertThat(machine.hasStateMachineError(), is(true));
assertThat(machine.getState().getIds(), containsInAnyOrder(TestStates.S1));
machine.sendEvent(TestEvents.E1);
assertThat(machine.getState().getIds(), containsInAnyOrder(TestStates.S1));
}
@Test
public void testListenerErrorsCauseNoMalfunction() throws Exception {
context.register(EventListenerConfig2.class, Config1.class);
context.refresh();
@SuppressWarnings("unchecked")
ObjectStateMachine<TestStates,TestEvents> machine =
context.getBean(StateMachineSystemConstants.DEFAULT_ID_STATEMACHINE, ObjectStateMachine.class);
StartedStateMachineListener listener1 = new StartedStateMachineListener();
ErroringStateMachineListener listener2 = new ErroringStateMachineListener();
StateChangedStateMachineListener listener3 = new StateChangedStateMachineListener();
machine.addStateListener(listener1);
machine.addStateListener(listener2);
machine.start();
assertThat(listener1.latch.await(2, TimeUnit.SECONDS), is(true));
machine.addStateListener(listener3);
machine.sendEvent(TestEvents.E1);
assertThat(listener3.latch.await(2, TimeUnit.SECONDS), is(true));
assertThat(machine.getState().getIds(), containsInAnyOrder(TestStates.S2));
}
@Test
public void testListenerErrorsCauseNoMalfunction2() throws Exception {
context.register(EventListenerConfig2.class, Config1.class);
context.refresh();
@SuppressWarnings("unchecked")
ObjectStateMachine<TestStates,TestEvents> machine =
context.getBean(StateMachineSystemConstants.DEFAULT_ID_STATEMACHINE, ObjectStateMachine.class);
StartedStateMachineListener listener1 = new StartedStateMachineListener();
ErroringStateMachineListener2 listener2 = new ErroringStateMachineListener2();
StateChangedStateMachineListener listener3 = new StateChangedStateMachineListener();
machine.addStateListener(listener1);
machine.addStateListener(listener2);
machine.start();
assertThat(listener1.latch.await(2, TimeUnit.SECONDS), is(true));
machine.addStateListener(listener3);
machine.sendEvent(TestEvents.E1);
assertThat(listener3.latch.await(2, TimeUnit.SECONDS), is(true));
assertThat(machine.getState().getIds(), containsInAnyOrder(TestStates.S2));
}
@Configuration
@EnableStateMachine
static class Config1 extends EnumStateMachineConfigurerAdapter<TestStates, TestEvents> {
@Override
public void configure(StateMachineStateConfigurer<TestStates, TestEvents> states) throws Exception {
states
.withStates()
.initial(TestStates.S1)
.state(TestStates.S2)
.state(TestStates.S3)
.state(TestStates.S4);
}
@Override
public void configure(StateMachineTransitionConfigurer<TestStates, TestEvents> transitions) throws Exception {
transitions
.withExternal()
.source(TestStates.S1)
.target(TestStates.S2)
.event(TestEvents.E1)
.and()
.withExternal()
.source(TestStates.S2)
.target(TestStates.S3)
.event(TestEvents.E2)
.and()
.withExternal()
.source(TestStates.S3)
.target(TestStates.S4)
.event(TestEvents.E3)
.and()
.withExternal()
.source(TestStates.S4)
.target(TestStates.S3)
.event(TestEvents.E4);
}
}
@Configuration
@EnableStateMachine
static class Config2 extends EnumStateMachineConfigurerAdapter<TestStates, TestEvents> {
@Override
public void configure(StateMachineStateConfigurer<TestStates, TestEvents> states) throws Exception {
states
.withStates()
.initial(TestStates.S1)
.state(TestStates.S2)
.state(TestStates.S3);
}
@Override
public void configure(StateMachineTransitionConfigurer<TestStates, TestEvents> transitions) throws Exception {
transitions
.withExternal()
.source(TestStates.S1)
.target(TestStates.S2)
.event(TestEvents.E1)
.and()
.withExternal()
.source(TestStates.S2)
.target(TestStates.S3)
.event(TestEvents.E2);
}
}
@Configuration
static class EventListenerConfig1 {
@Bean
public TestApplicationEventListener1 testApplicationEventListener1() {
return new TestApplicationEventListener1();
}
@Bean
public TestApplicationEventListener2 testApplicationEventListener2() {
return new TestApplicationEventListener2();
}
}
@Configuration
static class EventListenerConfig2 {
@Bean
public ErroringApplicationEventListener1 erroringApplicationEventListener1() {
return new ErroringApplicationEventListener1();
}
}
static class TestStateMachineListener extends StateMachineListenerAdapter<TestStates, TestEvents> {
CountDownLatch latch = new CountDownLatch(1);
int count = 0;
@Override
public void stateMachineError(StateMachine<TestStates, TestEvents> stateMachine, Exception exception) {
count++;
latch.countDown();
}
}
static class TestApplicationEventListener1 implements ApplicationListener<StateMachineEvent> {
CountDownLatch latch = new CountDownLatch(1);
int count = 0;
@Override
public void onApplicationEvent(StateMachineEvent event) {
if (event instanceof OnStateMachineError) {
count++;
latch.countDown();
}
}
}
static class TestApplicationEventListener2 implements ApplicationListener<OnStateMachineError> {
CountDownLatch latch = new CountDownLatch(1);
int count = 0;
@Override
public void onApplicationEvent(OnStateMachineError event) {
count++;
latch.countDown();
}
}
static class ErroringApplicationEventListener1 implements ApplicationListener<StateMachineEvent> {
@Override
public void onApplicationEvent(StateMachineEvent event) {
throw new RuntimeException();
}
}
static class StateChangedStateMachineListener extends StateMachineListenerAdapter<TestStates, TestEvents> {
CountDownLatch latch = new CountDownLatch(1);
@Override
public void stateChanged(State<TestStates, TestEvents> from, State<TestStates, TestEvents> to) {
latch.countDown();
}
void reset(int a) {
latch = new CountDownLatch(a);
}
}
static class StartedStateMachineListener extends StateMachineListenerAdapter<TestStates, TestEvents> {
CountDownLatch latch = new CountDownLatch(1);
@Override
public void stateMachineStarted(StateMachine<TestStates, TestEvents> stateMachine) {
latch.countDown();
}
}
static class ErroringStateMachineListener implements StateMachineListener<TestStates, TestEvents> {
@Override
public void stateChanged(State<TestStates, TestEvents> from, State<TestStates, TestEvents> to) {
throw new RuntimeException();
}
@Override
public void stateEntered(State<TestStates, TestEvents> state) {
throw new RuntimeException();
}
@Override
public void stateExited(State<TestStates, TestEvents> state) {
throw new RuntimeException();
}
@Override
public void eventNotAccepted(Message<TestEvents> event) {
throw new RuntimeException();
}
@Override
public void transition(Transition<TestStates, TestEvents> transition) {
throw new RuntimeException();
}
@Override
public void transitionStarted(Transition<TestStates, TestEvents> transition) {
throw new RuntimeException();
}
@Override
public void transitionEnded(Transition<TestStates, TestEvents> transition) {
throw new RuntimeException();
}
@Override
public void stateMachineStarted(StateMachine<TestStates, TestEvents> stateMachine) {
throw new RuntimeException();
}
@Override
public void stateMachineStopped(StateMachine<TestStates, TestEvents> stateMachine) {
throw new RuntimeException();
}
@Override
public void stateMachineError(StateMachine<TestStates, TestEvents> stateMachine, Exception exception) {
throw new RuntimeException();
}
@Override
public void extendedStateChanged(Object key, Object value) {
throw new RuntimeException();
}
@Override
public void stateContext(StateContext<TestStates, TestEvents> stateContext) {
throw new RuntimeException();
}
}
static class ErroringStateMachineListener2 implements StateMachineListener<TestStates, TestEvents> {
@Override
public void stateChanged(State<TestStates, TestEvents> from, State<TestStates, TestEvents> to) {
throw new Error();
}
@Override
public void stateEntered(State<TestStates, TestEvents> state) {
throw new Error();
}
@Override
public void stateExited(State<TestStates, TestEvents> state) {
throw new Error();
}
@Override
public void eventNotAccepted(Message<TestEvents> event) {
throw new Error();
}
@Override
public void transition(Transition<TestStates, TestEvents> transition) {
throw new Error();
}
@Override
public void transitionStarted(Transition<TestStates, TestEvents> transition) {
throw new Error();
}
@Override
public void transitionEnded(Transition<TestStates, TestEvents> transition) {
throw new Error();
}
@Override
public void stateMachineStarted(StateMachine<TestStates, TestEvents> stateMachine) {
throw new Error();
}
@Override
public void stateMachineStopped(StateMachine<TestStates, TestEvents> stateMachine) {
throw new Error();
}
@Override
public void stateMachineError(StateMachine<TestStates, TestEvents> stateMachine, Exception exception) {
throw new Error();
}
@Override
public void extendedStateChanged(Object key, Object value) {
throw new Error();
}
@Override
public void stateContext(StateContext<TestStates, TestEvents> stateContext) {
throw new Error();
}
}
@Override
protected AnnotationConfigApplicationContext buildContext() {
return new AnnotationConfigApplicationContext();
}
@Test
public void testEvents() throws Exception {
context.register(EventListenerConfig1.class, Config1.class);
context.refresh();
TestApplicationEventListener1 listener1 = context.getBean(TestApplicationEventListener1.class);
TestApplicationEventListener2 listener3 = context.getBean(TestApplicationEventListener2.class);
@SuppressWarnings("unchecked")
ObjectStateMachine<TestStates,TestEvents> machine =
context.getBean(StateMachineSystemConstants.DEFAULT_ID_STATEMACHINE, ObjectStateMachine.class);
assertThat(machine.hasStateMachineError()).isFalse();
TestStateMachineListener listener2 = new TestStateMachineListener();
machine.addStateListener(listener2);
machine.start();
machine.setStateMachineError(new RuntimeException("myerror"));
assertThat(listener1.latch.await(1, TimeUnit.SECONDS)).isTrue();
assertThat(listener1.count).isEqualTo(1);
assertThat(listener3.latch.await(1, TimeUnit.SECONDS)).isTrue();
assertThat(listener3.count).isEqualTo(1);
assertThat(listener2.latch.await(1, TimeUnit.SECONDS)).isTrue();
assertThat(listener2.count).isEqualTo(1);
assertThat(machine.hasStateMachineError()).isTrue();
}
@Test
public void testInterceptHandlesError() throws Exception {
context.register(EventListenerConfig1.class, Config1.class);
context.refresh();
TestApplicationEventListener1 listener1 = context.getBean(TestApplicationEventListener1.class);
@SuppressWarnings("unchecked")
ObjectStateMachine<TestStates,TestEvents> machine =
context.getBean(StateMachineSystemConstants.DEFAULT_ID_STATEMACHINE, ObjectStateMachine.class);
assertThat(machine.hasStateMachineError()).isFalse();
machine.getStateMachineAccessor().doWithRegion(
function -> function.addStateMachineInterceptor(new StateMachineInterceptorAdapter<TestStates,TestEvents>() {
@Override
public Exception stateMachineError(StateMachine<TestStates, TestEvents> stateMachine,
Exception exception) {
return null;
}
}));
TestStateMachineListener listener2 = new TestStateMachineListener();
machine.addStateListener(listener2);
machine.start();
machine.setStateMachineError(new RuntimeException("myerror"));
assertThat(listener1.latch.await(1, TimeUnit.SECONDS)).isFalse();
assertThat(listener1.count).isEqualTo(0);
assertThat(listener2.latch.await(1, TimeUnit.SECONDS)).isFalse();
assertThat(listener2.count).isEqualTo(0);
assertThat(machine.hasStateMachineError()).isFalse();
}
@Test
public void testErrorActive() throws Exception {
context.register(Config1.class);
context.refresh();
@SuppressWarnings("unchecked")
ObjectStateMachine<TestStates,TestEvents> machine =
context.getBean(StateMachineSystemConstants.DEFAULT_ID_STATEMACHINE, ObjectStateMachine.class);
assertThat(machine.hasStateMachineError()).isFalse();
machine.start();
machine.setStateMachineError(new RuntimeException("myerror"));
assertThat(machine.hasStateMachineError()).isTrue();
assertThat(machine.getState().getIds()).containsExactlyInAnyOrder(TestStates.S1);
machine.sendEvent(TestEvents.E1);
assertThat(machine.getState().getIds()).containsExactlyInAnyOrder(TestStates.S1);
}
@Test
public void testListenerErrorsCauseNoMalfunction() throws Exception {
context.register(EventListenerConfig2.class, Config1.class);
context.refresh();
@SuppressWarnings("unchecked")
ObjectStateMachine<TestStates,TestEvents> machine =
context.getBean(StateMachineSystemConstants.DEFAULT_ID_STATEMACHINE, ObjectStateMachine.class);
StartedStateMachineListener listener1 = new StartedStateMachineListener();
ErroringStateMachineListener listener2 = new ErroringStateMachineListener();
StateChangedStateMachineListener listener3 = new StateChangedStateMachineListener();
machine.addStateListener(listener1);
machine.addStateListener(listener2);
machine.start();
assertThat(listener1.latch.await(2, TimeUnit.SECONDS)).isTrue();
machine.addStateListener(listener3);
machine.sendEvent(TestEvents.E1);
assertThat(listener3.latch.await(2, TimeUnit.SECONDS)).isTrue();
assertThat(machine.getState().getIds()).containsExactlyInAnyOrder(TestStates.S2);
}
@Test
public void testListenerErrorsCauseNoMalfunction2() throws Exception {
context.register(EventListenerConfig2.class, Config1.class);
context.refresh();
@SuppressWarnings("unchecked")
ObjectStateMachine<TestStates,TestEvents> machine =
context.getBean(StateMachineSystemConstants.DEFAULT_ID_STATEMACHINE, ObjectStateMachine.class);
StartedStateMachineListener listener1 = new StartedStateMachineListener();
ErroringStateMachineListener2 listener2 = new ErroringStateMachineListener2();
StateChangedStateMachineListener listener3 = new StateChangedStateMachineListener();
machine.addStateListener(listener1);
machine.addStateListener(listener2);
machine.start();
assertThat(listener1.latch.await(2, TimeUnit.SECONDS)).isTrue();
machine.addStateListener(listener3);
machine.sendEvent(TestEvents.E1);
assertThat(listener3.latch.await(2, TimeUnit.SECONDS)).isTrue();
assertThat(machine.getState().getIds()).containsExactlyInAnyOrder(TestStates.S2);
}
@Configuration
@EnableStateMachine
static class Config1 extends EnumStateMachineConfigurerAdapter<TestStates, TestEvents> {
@Override
public void configure(StateMachineStateConfigurer<TestStates, TestEvents> states) throws Exception {
states
.withStates()
.initial(TestStates.S1)
.state(TestStates.S2)
.state(TestStates.S3)
.state(TestStates.S4);
}
@Override
public void configure(StateMachineTransitionConfigurer<TestStates, TestEvents> transitions) throws Exception {
transitions
.withExternal()
.source(TestStates.S1)
.target(TestStates.S2)
.event(TestEvents.E1)
.and()
.withExternal()
.source(TestStates.S2)
.target(TestStates.S3)
.event(TestEvents.E2)
.and()
.withExternal()
.source(TestStates.S3)
.target(TestStates.S4)
.event(TestEvents.E3)
.and()
.withExternal()
.source(TestStates.S4)
.target(TestStates.S3)
.event(TestEvents.E4);
}
}
@Configuration
@EnableStateMachine
static class Config2 extends EnumStateMachineConfigurerAdapter<TestStates, TestEvents> {
@Override
public void configure(StateMachineStateConfigurer<TestStates, TestEvents> states) throws Exception {
states
.withStates()
.initial(TestStates.S1)
.state(TestStates.S2)
.state(TestStates.S3);
}
@Override
public void configure(StateMachineTransitionConfigurer<TestStates, TestEvents> transitions) throws Exception {
transitions
.withExternal()
.source(TestStates.S1)
.target(TestStates.S2)
.event(TestEvents.E1)
.and()
.withExternal()
.source(TestStates.S2)
.target(TestStates.S3)
.event(TestEvents.E2);
}
}
@Configuration
static class EventListenerConfig1 {
@Bean
public TestApplicationEventListener1 testApplicationEventListener1() {
return new TestApplicationEventListener1();
}
@Bean
public TestApplicationEventListener2 testApplicationEventListener2() {
return new TestApplicationEventListener2();
}
}
@Configuration
static class EventListenerConfig2 {
@Bean
public ErroringApplicationEventListener1 erroringApplicationEventListener1() {
return new ErroringApplicationEventListener1();
}
}
static class TestStateMachineListener extends StateMachineListenerAdapter<TestStates, TestEvents> {
CountDownLatch latch = new CountDownLatch(1);
int count = 0;
@Override
public void stateMachineError(StateMachine<TestStates, TestEvents> stateMachine, Exception exception) {
count++;
latch.countDown();
}
}
static class TestApplicationEventListener1 implements ApplicationListener<StateMachineEvent> {
CountDownLatch latch = new CountDownLatch(1);
int count = 0;
@Override
public void onApplicationEvent(StateMachineEvent event) {
if (event instanceof OnStateMachineError) {
count++;
latch.countDown();
}
}
}
static class TestApplicationEventListener2 implements ApplicationListener<OnStateMachineError> {
CountDownLatch latch = new CountDownLatch(1);
int count = 0;
@Override
public void onApplicationEvent(OnStateMachineError event) {
count++;
latch.countDown();
}
}
static class ErroringApplicationEventListener1 implements ApplicationListener<StateMachineEvent> {
@Override
public void onApplicationEvent(StateMachineEvent event) {
throw new RuntimeException();
}
}
static class StateChangedStateMachineListener extends StateMachineListenerAdapter<TestStates, TestEvents> {
CountDownLatch latch = new CountDownLatch(1);
@Override
public void stateChanged(State<TestStates, TestEvents> from, State<TestStates, TestEvents> to) {
latch.countDown();
}
void reset(int a) {
latch = new CountDownLatch(a);
}
}
static class StartedStateMachineListener extends StateMachineListenerAdapter<TestStates, TestEvents> {
CountDownLatch latch = new CountDownLatch(1);
@Override
public void stateMachineStarted(StateMachine<TestStates, TestEvents> stateMachine) {
latch.countDown();
}
}
static class ErroringStateMachineListener implements StateMachineListener<TestStates, TestEvents> {
@Override
public void stateChanged(State<TestStates, TestEvents> from, State<TestStates, TestEvents> to) {
throw new RuntimeException();
}
@Override
public void stateEntered(State<TestStates, TestEvents> state) {
throw new RuntimeException();
}
@Override
public void stateExited(State<TestStates, TestEvents> state) {
throw new RuntimeException();
}
@Override
public void eventNotAccepted(Message<TestEvents> event) {
throw new RuntimeException();
}
@Override
public void transition(Transition<TestStates, TestEvents> transition) {
throw new RuntimeException();
}
@Override
public void transitionStarted(Transition<TestStates, TestEvents> transition) {
throw new RuntimeException();
}
@Override
public void transitionEnded(Transition<TestStates, TestEvents> transition) {
throw new RuntimeException();
}
@Override
public void stateMachineStarted(StateMachine<TestStates, TestEvents> stateMachine) {
throw new RuntimeException();
}
@Override
public void stateMachineStopped(StateMachine<TestStates, TestEvents> stateMachine) {
throw new RuntimeException();
}
@Override
public void stateMachineError(StateMachine<TestStates, TestEvents> stateMachine, Exception exception) {
throw new RuntimeException();
}
@Override
public void extendedStateChanged(Object key, Object value) {
throw new RuntimeException();
}
@Override
public void stateContext(StateContext<TestStates, TestEvents> stateContext) {
throw new RuntimeException();
}
}
static class ErroringStateMachineListener2 implements StateMachineListener<TestStates, TestEvents> {
@Override
public void stateChanged(State<TestStates, TestEvents> from, State<TestStates, TestEvents> to) {
throw new Error();
}
@Override
public void stateEntered(State<TestStates, TestEvents> state) {
throw new Error();
}
@Override
public void stateExited(State<TestStates, TestEvents> state) {
throw new Error();
}
@Override
public void eventNotAccepted(Message<TestEvents> event) {
throw new Error();
}
@Override
public void transition(Transition<TestStates, TestEvents> transition) {
throw new Error();
}
@Override
public void transitionStarted(Transition<TestStates, TestEvents> transition) {
throw new Error();
}
@Override
public void transitionEnded(Transition<TestStates, TestEvents> transition) {
throw new Error();
}
@Override
public void stateMachineStarted(StateMachine<TestStates, TestEvents> stateMachine) {
throw new Error();
}
@Override
public void stateMachineStopped(StateMachine<TestStates, TestEvents> stateMachine) {
throw new Error();
}
@Override
public void stateMachineError(StateMachine<TestStates, TestEvents> stateMachine, Exception exception) {
throw new Error();
}
@Override
public void extendedStateChanged(Object key, Object value) {
throw new Error();
}
@Override
public void stateContext(StateContext<TestStates, TestEvents> stateContext) {
throw new Error();
}
}
@SuppressWarnings("unchecked")
@Test
public void testActionEntryError() throws Exception {
context.register(Config3.class);
context.refresh();
ObjectStateMachine<String, String> machine =
context.getBean(StateMachineSystemConstants.DEFAULT_ID_STATEMACHINE, ObjectStateMachine.class);
machine.start();
assertThat(machine.getState().getIds()).containsExactlyInAnyOrder("S2");
}
@Configuration
@EnableStateMachine
static class Config3 extends StateMachineConfigurerAdapter<String, String> {
@Override
public void configure(StateMachineStateConfigurer<String, String> states) throws Exception {
states
.withStates()
.initial("SI")
.state("S1")
.stateEntry("S2", (context) -> {
throw new RuntimeException("error");
});
}
@Override
public void configure(StateMachineTransitionConfigurer<String, String> transitions) throws Exception {
transitions
.withExternal()
.source("SI")
.target("S1")
.and()
.withExternal()
.source("S1")
.target("S2");
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@ import java.lang.reflect.Method;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.statemachine.StateMachineEventResult.ResultType;
import org.springframework.statemachine.action.Action;
import org.springframework.statemachine.config.StateMachineFactory;
import org.springframework.statemachine.persist.StateMachinePersister;
@@ -36,7 +37,7 @@ import reactor.test.StepVerifier;
/**
* Utils for tests.
*
*
* @author Janne Valkealahti
*
*/
@@ -107,6 +108,22 @@ public class TestUtils {
.verifyComplete();
}
public static <S, E> void doSendEventAndConsumeResultAsDenied(StateMachine<S, E> stateMachine, E event) {
StepVerifier.create(stateMachine.sendEvent(eventAsMono(event)))
.consumeNextWith(result -> {
assertThat(result.getResultType()).isEqualTo(ResultType.DENIED);
})
.verifyComplete();
}
public static <S, E> void doSendEventAndConsumeResultAsDenied(StateMachine<S, E> stateMachine, Message<E> event) {
StepVerifier.create(stateMachine.sendEvent(eventAsMono(event)))
.consumeNextWith(result -> {
assertThat(result.getResultType()).isEqualTo(ResultType.DENIED);
})
.verifyComplete();
}
@SuppressWarnings("unchecked")
public static <T> T readField(String name, Object target) throws Exception {
Field field = null;
@@ -169,5 +186,5 @@ public class TestUtils {
method.setAccessible(true);
return (T) ReflectionUtils.invokeMethod(method, target, args);
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.springframework.statemachine.TestUtils.doSendEventAndConsumeAll;
import static org.springframework.statemachine.TestUtils.doSendEventAndConsumeResultAsDenied;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -60,6 +61,18 @@ public abstract class AbstractSecurityTests extends AbstractStateMachineTests {
assertThat(machine.getState().getIds(), containsInAnyOrder(States.S0));
}
protected static void assertTransitionDeniedResultAsDenied(StateMachine<States, Events> machine, TestListener listener) throws Exception {
assertThat(listener.stateChangedLatch.await(2, TimeUnit.SECONDS), is(true));
assertThat(listener.stateChangedCount, is(1));
assertThat(machine.getState().getIds(), containsInAnyOrder(States.S0));
listener.reset(1);
doSendEventAndConsumeResultAsDenied(machine, Events.A);
assertThat(listener.stateChangedLatch.await(2, TimeUnit.SECONDS), is(false));
assertThat(listener.stateChangedCount, is(0));
assertThat(machine.getState().getIds(), containsInAnyOrder(States.S0));
}
protected static enum States {
S0, S1;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,7 +35,7 @@ public class EventSecurityTests extends AbstractSecurityTests {
public void testNoSecurityContext() throws Exception {
TestListener listener = new TestListener();
StateMachine<States, Events> machine = buildMachine(listener, "ROLE_ANONYMOUS", ComparisonType.ANY, null);
assertTransitionDenied(machine, listener);
assertTransitionDeniedResultAsDenied(machine, listener);
}
@Test