Enhancements to tasks recipe

- Fix AbstractStateMachineFactory so that it does not try
  to create transitions with null source or target. Basically we
  just skip transition source or target which doesn't belong to
  region.
- Change DefaultExtendedState to use ConcurrentHashMap so that
  there's less change to get into trouble with concurrency.
- Add statechart for tasks recipe.
- Fix various concepts in TasksHandler and its tests.
- Prepare some examples for docs.
- Fixes #74
This commit is contained in:
Janne Valkealahti
2015-07-25 17:09:54 +01:00
parent 0fba91cbf2
commit 178ee816a4
7 changed files with 399 additions and 46 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

View File

@@ -0,0 +1,46 @@
+----------------------------------------------------------------------------+
| SM |
+----------------------------------------------------------------------------+
| |
| +---------------------------+ |
| FORK | TASKS | JOIN |
| | +---------------------------+ | |
| +-------------+ RUN | | +-------------------+ | | |
| *-->| READY |----->|----->| *-->| TASK_id_INITIAL | |---->|----+ |
| +-------------+ | | +-------------------+ | | | |
| ^ ^ | | | | | | |
| | | | v | | |
| | | | +-------------------+ | | |
| | | | | TASK_id | | | |
| | | | +-------------------+ | | |
| | | |===========================| | |
| | | | +-------------------+ | | |
| | | | *-->| TASK_id_INITIAL | | | |
| | | | +-------------------+ | | |
| | | | | | | |
| | | | v | | |
| | | | +-------------------+ | | |
| | | | | TASK_id | | | |
| | | | +-------------------+ | | |
| | | +---------------------------+ | |
| | | | |
| | | [OK] +------------+ | |
| | +--------------------------| CHOICE |<----------------+ |
| | +------------+ |
| | | |
| | | [ERROR] |
| | v |
| | +-----------------------------------------------+ |
| | | ERROR | |
| | +-----------------------------------------------+ |
| | CONTINUE | +-------------+ FALLBACK +-------------+ | |
| +-------------| *-->| AUTOMATIC |--------->| MANUAL | | |
| | | | | | | |
| | | | | FIX | | |
| | | | | +-----+ | | |
| | | | | | | | | |
| | | | | | v | | |
| | +-------------+ +-------------+ | |
| +-----------------------------------------------+ |
| |
+----------------------------------------------------------------------------+

View File

@@ -510,7 +510,7 @@ public abstract class AbstractStateMachineFactory<S, E> extends LifecycleObjectS
if (transitionData.getKind() == TransitionKind.EXTERNAL) {
// TODO can we do this?
if (stateMap.get(source) == null && stateMap.get(target) == null) {
if (stateMap.get(source) == null || stateMap.get(target) == null) {
continue;
}
DefaultExternalTransition<S, E> transition = new DefaultExternalTransition<S, E>(stateMap.get(source),

View File

@@ -15,8 +15,8 @@
*/
package org.springframework.statemachine.support;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.statemachine.ExtendedState;
@@ -34,7 +34,7 @@ public class DefaultExtendedState implements ExtendedState {
* Instantiates a new default extended state.
*/
public DefaultExtendedState() {
this.variables = new HashMap<Object, Object>();
this.variables = new ConcurrentHashMap<Object, Object>();
}
/**

View File

@@ -151,6 +151,23 @@ public class TasksHandler {
return new Builder();
}
/**
* Mark all extended state variables related to tasks fixed.
*/
public void markAllTasksFixed() {
Map<Object, Object> variables = getStateMachine().getExtendedState().getVariables();
for (Entry<Object, Object> entry : variables.entrySet()) {
if (entry.getKey() instanceof String && ((String)entry.getKey()).startsWith(STATE_TASKS_PREFIX)) {
if (entry.getValue() instanceof Integer) {
Integer value = (Integer) entry.getValue();
if (value < 0) {
variables.put(entry.getKey(), 0);
}
}
}
}
}
private StateMachine<String, String> buildStateMachine(List<TaskWrapper> tasks, TaskExecutor taskExecutor)
throws Exception {
StateMachineBuilder.Builder<String, String> builder = StateMachineBuilder.builder();
@@ -201,6 +218,7 @@ public class TasksHandler {
stateMachineTransitionConfigurer
.withExternal()
.state(parent)
.source(initial)
.target(task);
}
@@ -425,11 +443,40 @@ public class TasksHandler {
};
}
/**
* {@link Action} calls {@link TasksListener#onTasksAutomaticFix(StateContext)}
* before checking status of extended state variables related to tasks. If all
* variables are ok, event {@code EVENT_CONTINUE} is sent, otherwise event
* {@code EVENT_FALLBACK} is send which takes state machine into a manual handling.
*
* @return the action
*/
private Action<String, String> automaticAction() {
return new Action<String, String>() {
@Override
public void execute(StateContext<String, String> context) {
listener.onTasksAutomaticFix(TasksHandler.this, context);
boolean hasErrors = false;
Map<Object, Object> variables = context.getExtendedState().getVariables();
for (Entry<Object, Object> entry : variables.entrySet()) {
if (entry.getKey() instanceof String && ((String)entry.getKey()).startsWith(STATE_TASKS_PREFIX)) {
if (entry.getValue() instanceof Integer) {
Integer value = (Integer) entry.getValue();
if (value < 0) {
hasErrors = true;
break;
}
}
}
}
if (hasErrors) {
context.getStateMachine().sendEvent(EVENT_FALLBACK);
} else {
context.getStateMachine().sendEvent(EVENT_CONTINUE);
}
}
};
}
@@ -451,7 +498,7 @@ public class TasksHandler {
if (entry.getValue() instanceof Integer) {
Integer value = (Integer) entry.getValue();
if (value < 0) {
variables.put(entry.getValue(), 0);
variables.put(entry.getKey(), 0);
}
}
}
@@ -460,6 +507,49 @@ public class TasksHandler {
};
}
/**
* Adapter class for {@link TasksListener}.
*/
public static class TasksListenerAdapter implements TasksListener {
@Override
public void onTasksStarted() {
}
@Override
public void onTasksContinue() {
}
@Override
public void onTaskPreExecute(Object id) {
}
@Override
public void onTaskPostExecute(Object id) {
}
@Override
public void onTaskFailed(Object id, Exception exception) {
}
@Override
public void onTaskSuccess(Object id) {
}
@Override
public void onTasksSuccess() {
}
@Override
public void onTasksError() {
}
@Override
public void onTasksAutomaticFix(TasksHandler handler, StateContext<String, String> context) {
}
}
/**
* {@code TasksListener} is a generic interface listening tasks
* execution events. Methods in this interface will be called in a
@@ -520,6 +610,16 @@ public class TasksHandler {
* tasks executed with an error.
*/
void onTasksError();
/**
* Called when tasks execution resulted an error and AUTOMATIC state
* is entered. This is a moment where extended state variables can be
* modified to allow continue into a READY state.
*
* @param handler the tasks handler
* @param context the state context
*/
void onTasksAutomaticFix(TasksHandler handler, StateContext<String, String> context);
}
private class CompositeTasksListener extends AbstractCompositeListener<TasksListener> implements
@@ -581,6 +681,13 @@ public class TasksHandler {
}
}
@Override
public void onTasksAutomaticFix(TasksHandler handler, StateContext<String, String> context) {
for (Iterator<TasksListener> iterator = getListeners().reverse(); iterator.hasNext();) {
iterator.next().onTasksAutomaticFix(handler, context);
}
}
}
/**
@@ -613,7 +720,7 @@ public class TasksHandler {
}
/**
* {@link Action} which is execution with every registered {@link Runnable}.
* {@link Action} which is executed with every registered {@link Runnable}.
*/
private class LocalRunnableAction extends RunnableAction {

View File

@@ -26,10 +26,11 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.statemachine.StateContext;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.listener.StateMachineListenerAdapter;
import org.springframework.statemachine.recipes.tasks.TasksHandler;
import org.springframework.statemachine.recipes.tasks.TasksHandler.TasksListener;
import org.springframework.statemachine.recipes.tasks.TasksHandler.TasksListenerAdapter;
import org.springframework.statemachine.state.State;
import org.springframework.statemachine.transition.Transition;
@@ -84,7 +85,7 @@ public class TasksHandlerTests {
}
@Test
public void testRunFailAndContinue() throws InterruptedException {
public void testRunFailAndFixAndContinue() throws InterruptedException {
TasksHandler handler = TasksHandler.builder()
.task("1", sleepRunnable())
.task("2", sleepRunnable())
@@ -92,7 +93,7 @@ public class TasksHandlerTests {
.build();
TestListener listener = new TestListener();
listener.reset(11, 0, 0);
listener.reset(12, 0, 0);
StateMachine<String, String> machine = handler.getStateMachine();
machine.addStateListener(listener);
machine.start();
@@ -101,8 +102,8 @@ public class TasksHandlerTests {
handler.runTasks();
assertThat(listener.stateChangedLatch.await(8, TimeUnit.SECONDS), is(true));
assertThat(listener.stateChangedCount, is(11));
assertThat(machine.getState().getIds(), contains(TasksHandler.STATE_ERROR, TasksHandler.STATE_AUTOMATIC));
assertThat(listener.stateChangedCount, is(12));
assertThat(machine.getState().getIds(), contains(TasksHandler.STATE_ERROR, TasksHandler.STATE_MANUAL));
handler.fixCurrentProblems();
handler.continueFromError();
@@ -112,6 +113,32 @@ public class TasksHandlerTests {
assertThat(machine.getState().getIds(), contains(TasksHandler.STATE_READY));
}
@Test
public void testRunFailAndAutomaticFix() throws InterruptedException {
TasksHandler handler = TasksHandler.builder()
.task("1", sleepRunnable())
.task("2", sleepRunnable())
.task("3", failRunnable())
.build();
TestTasksListener tasksListener = new TestTasksListener();
tasksListener.fix = true;
handler.addTasksListener(tasksListener);
TestListener listener = new TestListener();
listener.reset(12, 0, 0);
StateMachine<String, String> machine = handler.getStateMachine();
machine.addStateListener(listener);
machine.start();
assertThat(listener.stateMachineStartedLatch.await(1, TimeUnit.SECONDS), is(true));
handler.runTasks();
assertThat(listener.stateChangedLatch.await(8, TimeUnit.SECONDS), is(true));
assertThat(listener.stateChangedCount, is(12));
assertThat(machine.getState().getIds(), contains(TasksHandler.STATE_READY));
}
@Test
public void testDagSingleRoot() throws InterruptedException {
TasksHandler handler = TasksHandler.builder()
@@ -249,7 +276,7 @@ public class TasksHandlerTests {
.build();
TestListener listener = new TestListener();
listener.reset(11, 0, 0);
listener.reset(12, 0, 0);
StateMachine<String, String> machine = handler.getStateMachine();
machine.addStateListener(listener);
machine.start();
@@ -259,8 +286,8 @@ public class TasksHandlerTests {
handler.runTasks();
assertThat(listener.stateChangedLatch.await(8, TimeUnit.SECONDS), is(true));
assertThat(listener.stateChangedCount, is(11));
assertThat(machine.getState().getIds(), contains(TasksHandler.STATE_ERROR, TasksHandler.STATE_AUTOMATIC));
assertThat(listener.stateChangedCount, is(12));
assertThat(machine.getState().getIds(), contains(TasksHandler.STATE_ERROR, TasksHandler.STATE_MANUAL));
listener.reset(1, 0, 0);
handler.fixCurrentProblems();
@@ -354,7 +381,9 @@ public class TasksHandlerTests {
}
private class TestTasksListener implements TasksListener {
private static class TestTasksListener extends TasksListenerAdapter {
final Object lock = new Object();
volatile CountDownLatch onTasksStartedLatch = new CountDownLatch(1);
volatile CountDownLatch onTasksContinueLatch = new CountDownLatch(1);
@@ -374,71 +403,99 @@ public class TasksHandlerTests {
volatile int onTasksSuccess;
volatile int onTasksError;
volatile boolean fix = false;
@Override
public void onTasksStarted() {
onTasksStarted++;
onTasksStartedLatch.countDown();
synchronized (lock) {
onTasksStarted++;
onTasksStartedLatch.countDown();
}
}
@Override
public void onTasksContinue() {
onTasksContinue++;
onTasksContinueLatch.countDown();
synchronized (lock) {
onTasksContinue++;
onTasksContinueLatch.countDown();
}
}
@Override
public void onTaskPreExecute(Object id) {
onTaskPreExecute++;
onTaskPreExecuteLatch.countDown();
synchronized (lock) {
onTaskPreExecute++;
onTaskPreExecuteLatch.countDown();
}
}
@Override
public void onTaskPostExecute(Object id) {
onTaskPostExecute++;
onTaskPostExecuteLatch.countDown();
synchronized (lock) {
onTaskPostExecute++;
onTaskPostExecuteLatch.countDown();
}
}
@Override
public void onTaskFailed(Object id, Exception exception) {
onTaskFailed++;
onTaskFailedLatch.countDown();
synchronized (lock) {
onTaskFailed++;
onTaskFailedLatch.countDown();
}
}
@Override
public void onTaskSuccess(Object id) {
onTaskSuccess++;
onTaskSuccessLatch.countDown();
synchronized (lock) {
onTaskSuccess++;
onTaskSuccessLatch.countDown();
}
}
@Override
public void onTasksSuccess() {
onTasksSuccess++;
onTasksSuccessLatch.countDown();
synchronized (lock) {
onTasksSuccess++;
onTasksSuccessLatch.countDown();
}
}
@Override
public void onTasksError() {
onTasksError++;
onTasksErrorLatch.countDown();
synchronized (lock) {
onTasksError++;
onTasksErrorLatch.countDown();
}
}
@Override
public void onTasksAutomaticFix(TasksHandler handler, StateContext<String, String> context) {
if (!fix) {
return;
}
handler.markAllTasksFixed();
}
public void reset(int c1, int c2, int c3, int c4, int c5, int c6, int c7, int c8) {
onTasksStartedLatch = new CountDownLatch(c1);
onTasksContinueLatch = new CountDownLatch(c2);
onTaskPreExecuteLatch = new CountDownLatch(c3);
onTaskPostExecuteLatch = new CountDownLatch(c4);
onTaskFailedLatch = new CountDownLatch(c5);
onTaskSuccessLatch = new CountDownLatch(c6);
onTasksSuccessLatch = new CountDownLatch(c7);
onTasksErrorLatch = new CountDownLatch(c8);
onTasksStarted = 0;
onTasksContinue = 0;
onTaskPreExecute = 0;
onTaskPostExecute = 0;
onTaskFailed = 0;
onTaskSuccess = 0;
onTasksSuccess = 0;
onTasksError = 0;
synchronized (lock) {
onTasksStartedLatch = new CountDownLatch(c1);
onTasksContinueLatch = new CountDownLatch(c2);
onTaskPreExecuteLatch = new CountDownLatch(c3);
onTaskPostExecuteLatch = new CountDownLatch(c4);
onTaskFailedLatch = new CountDownLatch(c5);
onTaskSuccessLatch = new CountDownLatch(c6);
onTasksSuccessLatch = new CountDownLatch(c7);
onTasksErrorLatch = new CountDownLatch(c8);
onTasksStarted = 0;
onTasksContinue = 0;
onTaskPreExecute = 0;
onTaskPostExecute = 0;
onTaskFailed = 0;
onTaskSuccess = 0;
onTasksSuccess = 0;
onTasksError = 0;
}
}
}

View File

@@ -0,0 +1,143 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.statemachine.recipes.docs;
import org.springframework.statemachine.StateContext;
import org.springframework.statemachine.recipes.tasks.TasksHandler;
import org.springframework.statemachine.recipes.tasks.TasksHandler.TasksListenerAdapter;
public class DocsTasksSampleTests {
public void sample1() {
// tag::snippetB[]
TasksHandler handler = TasksHandler.builder()
.task("1", sleepRunnable())
.task("2", sleepRunnable())
.task("3", sleepRunnable())
.build();
handler.runTasks();
// end::snippetB[]
}
public void sample2() {
// tag::snippetC[]
MyTasksListener listener1 = new MyTasksListener();
MyTasksListener listener2 = new MyTasksListener();
TasksHandler handler = TasksHandler.builder()
.task("1", sleepRunnable())
.task("2", sleepRunnable())
.task("3", sleepRunnable())
.listener(listener1)
.build();
handler.addTasksListener(listener2);
handler.removeTasksListener(listener2);
handler.runTasks();
// end::snippetC[]
}
public void sample3() {
// tag::snippetD[]
TasksHandler handler = TasksHandler.builder()
.task("1", sleepRunnable())
.task("1", "12", sleepRunnable())
.task("1", "13", sleepRunnable())
.task("2", sleepRunnable())
.task("2", "22", sleepRunnable())
.task("2", "23", sleepRunnable())
.task("3", sleepRunnable())
.task("3", "32", sleepRunnable())
.task("3", "33", sleepRunnable())
.build();
handler.runTasks();
// end::snippetD[]
}
public void sample4() {
// tag::snippetE[]
TasksHandler handler = TasksHandler.builder()
.task("1", sleepRunnable())
.task("2", sleepRunnable())
.task("3", sleepRunnable())
.build();
handler.runTasks();
handler.fixCurrentProblems();
handler.continueFromError();
// end::snippetE[]
}
// tag::snippetAA[]
private static Runnable sleepRunnable() {
return new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
}
};
}
// end::snippetAA[]
// tag::snippetAB[]
private static class MyTasksListener extends TasksListenerAdapter {
@Override
public void onTasksStarted() {
}
@Override
public void onTasksContinue() {
}
@Override
public void onTaskPreExecute(Object id) {
}
@Override
public void onTaskPostExecute(Object id) {
}
@Override
public void onTaskFailed(Object id, Exception exception) {
}
@Override
public void onTaskSuccess(Object id) {
}
@Override
public void onTasksSuccess() {
}
@Override
public void onTasksError() {
}
@Override
public void onTasksAutomaticFix(TasksHandler handler, StateContext<String, String> context) {
}
}
// end::snippetAB[]
}