Fix event overflow

- Fixes #89
- Added new method into EnsembleListeger which can
  be used to listen ensemble errors.
This commit is contained in:
Janne Valkealahti
2015-07-25 08:31:03 +01:00
parent 74f2f72ed6
commit 0fba91cbf2
8 changed files with 269 additions and 3 deletions

View File

@@ -56,4 +56,12 @@ public class CompositeEnsembleListener<S, E> extends AbstractCompositeListener<E
}
}
@Override
public void ensembleError(StateMachineEnsembleException exception) {
for (Iterator<EnsembleListeger<S, E>> iterator = getListeners().reverse(); iterator.hasNext();) {
EnsembleListeger<S, E> listener = iterator.next();
listener.ensembleError(exception);
}
}
}

View File

@@ -202,9 +202,9 @@ public class DistributedStateMachine<S, E> extends LifecycleObjectSupport implem
&& stateContext.getTransition().getKind() == TransitionKind.INTERNAL
&& ObjectUtils.nullSafeEquals(delegate.getId(),
stateContext.getMessageHeader(StateMachineSystemConstants.STATEMACHINE_IDENTIFIER))) {
StateMachineContext<S, E> xxx = ensemble.getState();
StateMachineContext<S, E> current = ensemble.getState();
ensemble.setState(new DefaultStateMachineContext<S, E>(
xxx.getState(), stateContext.getEvent(), stateContext
current.getState(), stateContext.getEvent(), stateContext
.getMessageHeaders(), stateContext.getStateMachine().getExtendedState()));
}
return stateContext;
@@ -262,6 +262,14 @@ public class DistributedStateMachine<S, E> extends LifecycleObjectSupport implem
}
}
@Override
public void ensembleError(StateMachineEnsembleException exception) {
// TODO: when we get support for sm error handling,
// propagate this exception there
log.error("Ensemble error", exception);
throw exception;
}
}
}

View File

@@ -51,4 +51,11 @@ public interface EnsembleListeger<S, E> {
*/
void stateChanged(StateMachineContext<S, E> context);
/**
* Called when {@link StateMachineEnsemble} resulted an error.
*
* @param exception the exception
*/
void ensembleError(StateMachineEnsembleException exception);
}

View File

@@ -0,0 +1,70 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.statemachine.ensemble;
import java.io.IOException;
import org.springframework.statemachine.StateMachineException;
/**
* General exception indicating a problem in ensemble.
*
* @author Janne Valkealahti
*
*/
public class StateMachineEnsembleException extends StateMachineException {
private static final long serialVersionUID = 960498044587123343L;
/**
* Instantiates a new state machine ensemble exception.
*
* @param e the e
*/
public StateMachineEnsembleException(IOException e) {
super(e);
}
/**
* Instantiates a new state machine ensemble exception.
*
* @param message the message
* @param e the e
*/
public StateMachineEnsembleException(String message, Exception e) {
super(message, e);
}
/**
* Instantiates a new state machine ensemble exception.
*
* @param message the message
* @param cause the cause
*/
public StateMachineEnsembleException(String message, Throwable cause) {
super(message, cause);
}
/**
* Instantiates a new state machine ensemble exception.
*
* @param message the message
*/
public StateMachineEnsembleException(String message) {
super(message);
}
}

View File

@@ -59,6 +59,10 @@ public abstract class StateMachineEnsembleObjectSupport<S, E> extends LifecycleO
ensembleListener.stateMachineLeft(stateMachine, context);
}
protected void notifyError(StateMachineEnsembleException exception) {
ensembleListener.ensembleError(exception);
}
protected void notifyStateChanged(StateMachineContext<S, E> context) {
if (log.isTraceEnabled()) {
log.trace("Notify notifyStateChanged " + context);

View File

@@ -38,6 +38,7 @@ import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.StateMachineContext;
import org.springframework.statemachine.StateMachineException;
import org.springframework.statemachine.ensemble.StateMachineEnsemble;
import org.springframework.statemachine.ensemble.StateMachineEnsembleException;
import org.springframework.statemachine.ensemble.StateMachineEnsembleObjectSupport;
import org.springframework.statemachine.ensemble.StateMachinePersist;
@@ -261,7 +262,7 @@ public class ZookeeperStateMachineEnsemble<S, E> extends StateMachineEnsembleObj
/**
* Register existing {@link CuratorWatcher} for a state path.
*/
private void registerWatcherForStatePath() {
protected void registerWatcherForStatePath() {
try {
if (curatorClient.getState() != CuratorFrameworkState.STOPPED) {
curatorClient.checkExists().usingWatcher(watcher).forPath(statePath);
@@ -328,6 +329,13 @@ public class ZookeeperStateMachineEnsemble<S, E> extends StateMachineEnsembleObj
StateMachineContext<S, E> context = ((ZookeeperStateMachinePersist<S, E>) persist)
.readLog(i, stat);
int ver = (stat.getVersion() - 1) * logSize + (i + 1);
// check if we're behind more than a log size meaning we can't
// replay full history, notify and break out from a loop
if (i + logSize < ver) {
notifyError(new StateMachineEnsembleException("Current version behind more that log size"));
break;
}
if (log.isDebugEnabled()) {
log.debug("Replay position " + i + " with version " + ver);
log.debug("Context in position " + i + " " + context);

View File

@@ -0,0 +1,94 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.statemachine.zookeeper;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import org.springframework.util.ReflectionUtils;
/**
* Utils for tests.
*
* @author Janne Valkealahti
*
*/
public class TestUtils {
@SuppressWarnings("unchecked")
public static <T> T readField(String name, Object target) throws Exception {
Field field = null;
Class<?> clazz = target.getClass();
do {
try {
field = clazz.getDeclaredField(name);
} catch (Exception ex) {
}
clazz = clazz.getSuperclass();
} while (field == null && !clazz.equals(Object.class));
if (field == null)
throw new IllegalArgumentException("Cannot find field '" + name + "' in the class hierarchy of "
+ target.getClass());
field.setAccessible(true);
return (T) field.get(target);
}
@SuppressWarnings("unchecked")
public static <T> T callMethod(String name, Object target) throws Exception {
Class<?> clazz = target.getClass();
Method method = ReflectionUtils.findMethod(clazz, name);
if (method == null)
throw new IllegalArgumentException("Cannot find method '" + method + "' in the class hierarchy of "
+ target.getClass());
method.setAccessible(true);
return (T) ReflectionUtils.invokeMethod(method, target);
}
public static void setField(String name, Object target, Object value) throws Exception {
Field field = null;
Class<?> clazz = target.getClass();
do {
try {
field = clazz.getDeclaredField(name);
} catch (Exception ex) {
}
clazz = clazz.getSuperclass();
} while (field == null && !clazz.equals(Object.class));
if (field == null)
throw new IllegalArgumentException("Cannot find field '" + name + "' in the class hierarchy of "
+ target.getClass());
field.setAccessible(true);
field.set(target, value);
}
@SuppressWarnings("unchecked")
public static <T> T callMethod(String name, Object target, Object[] args, Class<?>[] argsTypes) throws Exception {
Class<?> clazz = target.getClass();
Method method = ReflectionUtils.findMethod(clazz, name, argsTypes);
if (method == null)
throw new IllegalArgumentException("Cannot find method '" + method + "' in the class hierarchy of "
+ target.getClass());
method.setAccessible(true);
return (T) ReflectionUtils.invokeMethod(method, target, args);
}
}

View File

@@ -37,6 +37,7 @@ import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.StateMachineContext;
import org.springframework.statemachine.access.StateMachineAccessor;
import org.springframework.statemachine.ensemble.EnsembleListeger;
import org.springframework.statemachine.ensemble.StateMachineEnsembleException;
import org.springframework.statemachine.listener.StateMachineListener;
import org.springframework.statemachine.state.State;
import org.springframework.statemachine.support.DefaultExtendedState;
@@ -260,6 +261,58 @@ public class ZookeeperStateMachineEnsembleTests extends AbstractZookeeperTests {
}
}
@Test
public void testEventsOverflow() throws Exception {
context.register(ZkServerConfig.class, BaseConfig.class);
context.refresh();
CuratorFramework curatorClient =
context.getBean("curatorClient", CuratorFramework.class);
OverflowControlZookeeperStateMachineEnsemble ensemble =
new OverflowControlZookeeperStateMachineEnsemble(curatorClient, "/foo", true, 4);
TestEnsembleListener listener = new TestEnsembleListener();
ensemble.addEnsembleListener(listener);
ensemble.afterPropertiesSet();
ensemble.start();
listener.reset(0, 10, 1);
ensemble.enabled = false;
for (int i = 0; i < 5; i++) {
ensemble.setState(new DefaultStateMachineContext<String, String>("S" + i, "E" + i,
new HashMap<String, Object>(), new DefaultExtendedState()));
}
ensemble.enabled = true;
TestUtils.callMethod("registerWatcherForStatePath", ensemble);
assertThat(listener.errors.size(), is(0));
for (int i = 5; i < 6; i++) {
ensemble.setState(new DefaultStateMachineContext<String, String>("S" + i, "E" + i,
new HashMap<String, Object>(), new DefaultExtendedState()));
}
assertThat(listener.errorLatch.await(2, TimeUnit.SECONDS), is(true));
}
private class OverflowControlZookeeperStateMachineEnsemble extends ZookeeperStateMachineEnsemble<String, String> {
boolean enabled = true;
public OverflowControlZookeeperStateMachineEnsemble(CuratorFramework curatorClient, String basePath,
boolean cleanState, int logSize) {
super(curatorClient, basePath, cleanState, logSize);
}
@Override
protected void registerWatcherForStatePath() {
if (enabled) {
super.registerWatcherForStatePath();
}
}
}
@Override
protected AnnotationConfigApplicationContext buildContext() {
return new AnnotationConfigApplicationContext();
@@ -269,6 +322,8 @@ public class ZookeeperStateMachineEnsembleTests extends AbstractZookeeperTests {
volatile CountDownLatch joinedLatch = new CountDownLatch(1);
volatile CountDownLatch eventLatch = new CountDownLatch(1);
volatile CountDownLatch errorLatch = new CountDownLatch(1);
volatile List<Exception> errors = new ArrayList<Exception>();
volatile List<StateMachineContext<String, String>> events = new ArrayList<StateMachineContext<String,String>>();
@Override
@@ -286,10 +341,22 @@ public class ZookeeperStateMachineEnsembleTests extends AbstractZookeeperTests {
eventLatch.countDown();
}
@Override
public void ensembleError(StateMachineEnsembleException exception) {
errors.add(exception);
errorLatch.countDown();
}
public void reset(int c1, int c2) {
reset(c1, c2, 0);
}
public void reset(int c1, int c2, int c3) {
joinedLatch = new CountDownLatch(c1);
eventLatch = new CountDownLatch(c2);
errorLatch = new CountDownLatch(c3);
events.clear();
errors.clear();
}
}