Add listener api for extended state variables

- New extendedStateChanged method in StateMachineListener and
  its wingman OnExtendedStateChanged in context events.
- DefaultExtendedState is now using ObservableMap to get notify
  when individual variables has been modified.
- Fixes #85
This commit is contained in:
Janne Valkealahti
2015-08-09 14:56:27 +01:00
parent 05f69070d8
commit e8debdc5a5
12 changed files with 416 additions and 1 deletions

View File

@@ -44,4 +44,26 @@ public interface ExtendedState {
*/
<T> T get(Object key, Class<T> type);
/**
* Sets the extended state change listener.
*
* @param listener the new extended state change listener
*/
void setExtendedStateChangeListener(ExtendedStateChangeListener listener);
/**
* The listener interface for receiving extended state change events.
*/
public interface ExtendedStateChangeListener {
/**
* Called when extended state variable has been changed.
*
* @param key the key
* @param value the value
*/
void changed(Object key, Object value);
}
}

View File

@@ -122,4 +122,11 @@ public class DefaultStateMachineEventPublisher implements StateMachineEventPubli
}
}
@Override
public void publishExtendedStateChanged(Object source, Object key, Object value) {
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new OnExtendedStateChanged(source, key, value));
}
}
}

View File

@@ -0,0 +1,66 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.statemachine.event;
/**
* Generic event representing that extended state variable has been changed.
*
* @author Janne Valkealahti
*
*/
@SuppressWarnings("serial")
public class OnExtendedStateChanged extends StateMachineEvent {
private final Object key;
private final Object value;
/**
* Instantiates a new on extended state changed.
*
* @param source the source
* @param key the key
* @param value the value
*/
public OnExtendedStateChanged(Object source, Object key, Object value) {
super(source);
this.key = key;
this.value = value;
}
/**
* Gets the modified extended state variable key.
*
* @return the key
*/
public Object getKey() {
return key;
}
/**
* Gets the modified extended state variable value.
*
* @return the value
*/
public Object getValue() {
return value;
}
@Override
public String toString() {
return "OnExtendedStateChanged [key=" + key + ", value=" + value + "]";
}
}

View File

@@ -110,4 +110,13 @@ public interface StateMachineEventPublisher {
*/
void publishStateMachineError(Object source, StateMachine<?, ?> stateMachine, Exception exception);
/**
* Publish extended state changed.
*
* @param source the source
* @param key the key
* @param value the value
*/
void publishExtendedStateChanged(Object source, Object key, Object value);
}

View File

@@ -113,4 +113,12 @@ public class CompositeStateMachineListener<S,E> extends AbstractCompositeListene
}
}
@Override
public void extendedStateChanged(Object key, Object value) {
for (Iterator<StateMachineListener<S, E>> iterator = getListeners().reverse(); iterator.hasNext();) {
StateMachineListener<S, E> listener = iterator.next();
listener.extendedStateChanged(key, value);
}
}
}

View File

@@ -102,4 +102,12 @@ public interface StateMachineListener<S,E> {
*/
void stateMachineError(StateMachine<S, E> stateMachine, Exception exception);
/**
* Notified when extended state variable is either added, modified or removed.
*
* @param key the variable key
* @param value the variable value
*/
void extendedStateChanged(Object key, Object value);
}

View File

@@ -71,4 +71,8 @@ public class StateMachineListenerAdapter<S, E> implements StateMachineListener<S
public void stateMachineError(StateMachine<S, E> stateMachine, Exception exception) {
}
@Override
public void extendedStateChanged(Object key, Object value) {
}
}

View File

@@ -39,6 +39,7 @@ import org.springframework.statemachine.ExtendedState;
import org.springframework.statemachine.StateContext;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.StateMachineContext;
import org.springframework.statemachine.ExtendedState.ExtendedStateChangeListener;
import org.springframework.statemachine.access.StateMachineAccess;
import org.springframework.statemachine.access.StateMachineAccessor;
import org.springframework.statemachine.access.StateMachineFunction;
@@ -213,6 +214,13 @@ public abstract class AbstractStateMachine<S, E> extends StateMachineObjectSuppo
&& initialState.getPseudoState().getKind() == PseudoStateKind.INITIAL,
"Initial state's pseudostate kind must be INITIAL");
extendedState.setExtendedStateChangeListener(new ExtendedStateChangeListener() {
@Override
public void changed(Object key, Object value) {
notifyExtendedStateChanged(key, value);
}
});
// process given transitions
for (Transition<S, E> transition : transitions) {
Trigger<S, E> trigger = transition.getTrigger();

View File

@@ -19,6 +19,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.statemachine.ExtendedState;
import org.springframework.statemachine.support.ObservableMap.MapChangeListener;
/**
* Default implementation of a {@link ExtendedState}.
@@ -29,12 +30,14 @@ import org.springframework.statemachine.ExtendedState;
public class DefaultExtendedState implements ExtendedState {
private final Map<Object, Object> variables;
private ExtendedStateChangeListener listener;
/**
* Instantiates a new default extended state.
*/
public DefaultExtendedState() {
this.variables = new ConcurrentHashMap<Object, Object>();
this.variables = new ObservableMap<Object, Object>(new ConcurrentHashMap<Object, Object>(),
new LocalMapChangeListener());
}
/**
@@ -65,9 +68,39 @@ public class DefaultExtendedState implements ExtendedState {
return (T) value;
}
@Override
public void setExtendedStateChangeListener(ExtendedStateChangeListener listener) {
this.listener = listener;
}
@Override
public String toString() {
return "DefaultExtendedState [variables=" + variables + "]";
}
private class LocalMapChangeListener implements MapChangeListener<Object, Object> {
@Override
public void added(Object key, Object value) {
if (listener != null) {
listener.changed(key, value);
}
}
@Override
public void changed(Object key, Object value) {
if (listener != null) {
listener.changed(key, value);
}
}
@Override
public void removed(Object key, Object value) {
if (listener != null) {
listener.changed(key, value);
}
}
}
}

View File

@@ -0,0 +1,196 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.statemachine.support;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.util.Assert;
/**
* Utility class which wraps {@link Map} and notifies
* {@link MapChangeListener} of changes for individual
* change operations.
*
* @author Janne Valkealahti
*
* @param <K> the type of key
* @param <V> the type of value
*/
public class ObservableMap<K, V> implements Map<K, V> {
private volatile Map<K, V> delegate;
private volatile MapChangeListener<K, V> listener;
/**
* Instantiates a new observable map.
*/
public ObservableMap() {
// default constructor needed for kryo, thus
// we create delegate here, listener not needed.
delegate = new ConcurrentHashMap<K, V>();
}
/**
* Instantiates a new observable map.
*
* @param map the delegating map
* @param listener the map change listener
*/
public ObservableMap(Map<K, V> map, MapChangeListener<K, V> listener) {
Assert.notNull(map, "Delegating map must be set");
Assert.notNull(listener, "Listener must be set");
this.delegate = map;
this.listener = listener;
}
@Override
public int size() {
return delegate.size();
}
@Override
public boolean isEmpty() {
return delegate.isEmpty();
}
@Override
public boolean containsKey(Object key) {
return delegate.containsKey(key);
}
@Override
public boolean containsValue(Object value) {
return delegate.containsValue(value);
}
@Override
public V get(Object key) {
return delegate.get(key);
}
@Override
public V put(K key, V value) {
V put = delegate.put(key, value);
if (listener != null) {
if (put == null) {
listener.added(key, value);
} else if (value != null && !value.equals(put)) {
listener.changed(key, value);
}
}
return put;
}
@SuppressWarnings("unchecked")
@Override
public V remove(Object key) {
V remove = delegate.remove(key);
if (listener != null && remove != null) {
listener.removed((K)key, remove);
}
return remove;
}
@Override
public void putAll(Map<? extends K, ? extends V> m) {
delegate.putAll(m);
}
@Override
public void clear() {
delegate.clear();
}
@Override
public Set<K> keySet() {
return delegate.keySet();
}
@Override
public Collection<V> values() {
return delegate.values();
}
@Override
public Set<java.util.Map.Entry<K, V>> entrySet() {
return delegate.entrySet();
}
/**
* Gets the delegating map instance.
*
* @return the delegate
*/
public Map<K, V> getDelegate() {
return delegate;
}
/**
* Sets the delegate.
*
* @param delegate the delegate
*/
public void setDelegate(Map<K, V> delegate) {
this.delegate = delegate;
}
/**
* Sets the map change listener.
*
* @param listener the listener
*/
public void setListener(MapChangeListener<K, V> listener) {
this.listener = listener;
}
/**
* The listener interface for receiving map change events.
*
* @param <K> the key type
* @param <V> the value type
*/
public interface MapChangeListener<K, V> {
/**
* Called when new entry is added.
*
* @param key the key
* @param value the value
*/
void added(K key, V value);
/**
* Called when entry has been changed.
*
* @param key the key
* @param value the value
*/
void changed(K key, V value);
/**
* Called when entry has been removed.
*
* @param key the key
* @param value the value
*/
void removed(K key, V value);
}
}

View File

@@ -193,6 +193,16 @@ public abstract class StateMachineObjectSupport<S, E> extends LifecycleObjectSup
}
}
protected void notifyExtendedStateChanged(Object key, Object value) {
stateListener.extendedStateChanged(key, value);
if (contextEventsEnabled) {
StateMachineEventPublisher eventPublisher = getStateMachineEventPublisher();
if (eventPublisher != null) {
eventPublisher.publishExtendedStateChanged(this, key, value);
}
}
}
protected void stateChangedInRelay() {
// TODO: this is a temporary tweak to know when state is
// changed in a submachine/regions order to give
@@ -270,6 +280,11 @@ public abstract class StateMachineObjectSupport<S, E> extends LifecycleObjectSup
stateListener.stateMachineError(stateMachine, exception);
}
@Override
public void extendedStateChanged(Object key, Object value) {
stateListener.extendedStateChanged(key, value);
}
}
}

View File

@@ -103,6 +103,26 @@ public class ListenerTests extends AbstractStateMachineTests {
ctx.close();
}
@Test
public void testExtendedStateEvents() throws Exception {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(Config2.class);
assertTrue(ctx.containsBean(StateMachineSystemConstants.DEFAULT_ID_STATEMACHINE));
@SuppressWarnings("unchecked")
ObjectStateMachine<TestStates,TestEvents> machine =
ctx.getBean(StateMachineSystemConstants.DEFAULT_ID_STATEMACHINE, ObjectStateMachine.class);
TestStateMachineListener listener = new TestStateMachineListener();
machine.addStateListener(listener);
machine.start();
machine.getExtendedState().getVariables().put("foo", "jee");
assertThat(listener.extendedLatch.await(2, TimeUnit.SECONDS), is(true));
assertThat(listener.extended.size(), is(1));
assertThat(listener.extended.get(0).key, is("foo"));
assertThat(listener.extended.get(0).value, is("jee"));
ctx.close();
}
private static class LoggingAction implements Action<TestStates, TestEvents> {
private static final Log log = LogFactory.getLog(LoggingAction.class);
@@ -126,6 +146,9 @@ public class ListenerTests extends AbstractStateMachineTests {
volatile int started = 0;
volatile int stopped = 0;
CountDownLatch stopLatch = new CountDownLatch(1);
ArrayList<Holder2> extended = new ArrayList<Holder2>();
CountDownLatch extendedLatch = new CountDownLatch(1);
@Override
public void stateChanged(State<TestStates, TestEvents> from, State<TestStates, TestEvents> to) {
@@ -149,6 +172,16 @@ public class ListenerTests extends AbstractStateMachineTests {
}
}
static class Holder2 {
Object key;
Object value;
public Holder2(Object key, Object value) {
this.key = key;
this.value = value;
}
}
@Override
public void eventNotAccepted(Message<TestEvents> event) {
}
@@ -180,6 +213,12 @@ public class ListenerTests extends AbstractStateMachineTests {
public void stateMachineError(StateMachine<TestStates, TestEvents> stateMachine, Exception exception) {
}
@Override
public void extendedStateChanged(Object key, Object value) {
extended.add(new Holder2(key, value));
extendedLatch.countDown();
}
}
@Configuration