diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/CompositeEnsembleListener.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/CompositeEnsembleListener.java index 22ac3f1f..f185e11a 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/CompositeEnsembleListener.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/CompositeEnsembleListener.java @@ -56,4 +56,12 @@ public class CompositeEnsembleListener extends AbstractCompositeListener> iterator = getListeners().reverse(); iterator.hasNext();) { + EnsembleListeger listener = iterator.next(); + listener.ensembleError(exception); + } + } + } diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/DistributedStateMachine.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/DistributedStateMachine.java index 48cc095f..b54e0a41 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/DistributedStateMachine.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/DistributedStateMachine.java @@ -202,9 +202,9 @@ public class DistributedStateMachine extends LifecycleObjectSupport implem && stateContext.getTransition().getKind() == TransitionKind.INTERNAL && ObjectUtils.nullSafeEquals(delegate.getId(), stateContext.getMessageHeader(StateMachineSystemConstants.STATEMACHINE_IDENTIFIER))) { - StateMachineContext xxx = ensemble.getState(); + StateMachineContext current = ensemble.getState(); ensemble.setState(new DefaultStateMachineContext( - xxx.getState(), stateContext.getEvent(), stateContext + current.getState(), stateContext.getEvent(), stateContext .getMessageHeaders(), stateContext.getStateMachine().getExtendedState())); } return stateContext; @@ -262,6 +262,14 @@ public class DistributedStateMachine extends LifecycleObjectSupport implem } } + @Override + public void ensembleError(StateMachineEnsembleException exception) { + // TODO: when we get support for sm error handling, + // propagate this exception there + log.error("Ensemble error", exception); + throw exception; + } + } } diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/EnsembleListeger.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/EnsembleListeger.java index 7b5b60d5..7bd88811 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/EnsembleListeger.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/EnsembleListeger.java @@ -51,4 +51,11 @@ public interface EnsembleListeger { */ void stateChanged(StateMachineContext context); + /** + * Called when {@link StateMachineEnsemble} resulted an error. + * + * @param exception the exception + */ + void ensembleError(StateMachineEnsembleException exception); + } diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/StateMachineEnsembleException.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/StateMachineEnsembleException.java new file mode 100644 index 00000000..e50fb449 --- /dev/null +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/StateMachineEnsembleException.java @@ -0,0 +1,70 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.statemachine.ensemble; + +import java.io.IOException; + +import org.springframework.statemachine.StateMachineException; + +/** + * General exception indicating a problem in ensemble. + * + * @author Janne Valkealahti + * + */ +public class StateMachineEnsembleException extends StateMachineException { + + private static final long serialVersionUID = 960498044587123343L; + + /** + * Instantiates a new state machine ensemble exception. + * + * @param e the e + */ + public StateMachineEnsembleException(IOException e) { + super(e); + } + + /** + * Instantiates a new state machine ensemble exception. + * + * @param message the message + * @param e the e + */ + public StateMachineEnsembleException(String message, Exception e) { + super(message, e); + } + + /** + * Instantiates a new state machine ensemble exception. + * + * @param message the message + * @param cause the cause + */ + public StateMachineEnsembleException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Instantiates a new state machine ensemble exception. + * + * @param message the message + */ + public StateMachineEnsembleException(String message) { + super(message); + } + +} diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/StateMachineEnsembleObjectSupport.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/StateMachineEnsembleObjectSupport.java index cedfb99b..8554408c 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/StateMachineEnsembleObjectSupport.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/ensemble/StateMachineEnsembleObjectSupport.java @@ -59,6 +59,10 @@ public abstract class StateMachineEnsembleObjectSupport extends LifecycleO ensembleListener.stateMachineLeft(stateMachine, context); } + protected void notifyError(StateMachineEnsembleException exception) { + ensembleListener.ensembleError(exception); + } + protected void notifyStateChanged(StateMachineContext context) { if (log.isTraceEnabled()) { log.trace("Notify notifyStateChanged " + context); diff --git a/spring-statemachine-zookeeper/src/main/java/org/springframework/statemachine/zookeeper/ZookeeperStateMachineEnsemble.java b/spring-statemachine-zookeeper/src/main/java/org/springframework/statemachine/zookeeper/ZookeeperStateMachineEnsemble.java index 104f3817..0849eef1 100644 --- a/spring-statemachine-zookeeper/src/main/java/org/springframework/statemachine/zookeeper/ZookeeperStateMachineEnsemble.java +++ b/spring-statemachine-zookeeper/src/main/java/org/springframework/statemachine/zookeeper/ZookeeperStateMachineEnsemble.java @@ -38,6 +38,7 @@ import org.springframework.statemachine.StateMachine; import org.springframework.statemachine.StateMachineContext; import org.springframework.statemachine.StateMachineException; import org.springframework.statemachine.ensemble.StateMachineEnsemble; +import org.springframework.statemachine.ensemble.StateMachineEnsembleException; import org.springframework.statemachine.ensemble.StateMachineEnsembleObjectSupport; import org.springframework.statemachine.ensemble.StateMachinePersist; @@ -261,7 +262,7 @@ public class ZookeeperStateMachineEnsemble extends StateMachineEnsembleObj /** * Register existing {@link CuratorWatcher} for a state path. */ - private void registerWatcherForStatePath() { + protected void registerWatcherForStatePath() { try { if (curatorClient.getState() != CuratorFrameworkState.STOPPED) { curatorClient.checkExists().usingWatcher(watcher).forPath(statePath); @@ -328,6 +329,13 @@ public class ZookeeperStateMachineEnsemble extends StateMachineEnsembleObj StateMachineContext context = ((ZookeeperStateMachinePersist) persist) .readLog(i, stat); int ver = (stat.getVersion() - 1) * logSize + (i + 1); + + // check if we're behind more than a log size meaning we can't + // replay full history, notify and break out from a loop + if (i + logSize < ver) { + notifyError(new StateMachineEnsembleException("Current version behind more that log size")); + break; + } if (log.isDebugEnabled()) { log.debug("Replay position " + i + " with version " + ver); log.debug("Context in position " + i + " " + context); diff --git a/spring-statemachine-zookeeper/src/test/java/org/springframework/statemachine/zookeeper/TestUtils.java b/spring-statemachine-zookeeper/src/test/java/org/springframework/statemachine/zookeeper/TestUtils.java new file mode 100644 index 00000000..b43f279e --- /dev/null +++ b/spring-statemachine-zookeeper/src/test/java/org/springframework/statemachine/zookeeper/TestUtils.java @@ -0,0 +1,94 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.statemachine.zookeeper; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; + +import org.springframework.util.ReflectionUtils; + +/** + * Utils for tests. + * + * @author Janne Valkealahti + * + */ +public class TestUtils { + + @SuppressWarnings("unchecked") + public static T readField(String name, Object target) throws Exception { + Field field = null; + Class clazz = target.getClass(); + do { + try { + field = clazz.getDeclaredField(name); + } catch (Exception ex) { + } + + clazz = clazz.getSuperclass(); + } while (field == null && !clazz.equals(Object.class)); + + if (field == null) + throw new IllegalArgumentException("Cannot find field '" + name + "' in the class hierarchy of " + + target.getClass()); + field.setAccessible(true); + return (T) field.get(target); + } + + @SuppressWarnings("unchecked") + public static T callMethod(String name, Object target) throws Exception { + Class clazz = target.getClass(); + Method method = ReflectionUtils.findMethod(clazz, name); + + if (method == null) + throw new IllegalArgumentException("Cannot find method '" + method + "' in the class hierarchy of " + + target.getClass()); + method.setAccessible(true); + return (T) ReflectionUtils.invokeMethod(method, target); + } + + public static void setField(String name, Object target, Object value) throws Exception { + Field field = null; + Class clazz = target.getClass(); + do { + try { + field = clazz.getDeclaredField(name); + } catch (Exception ex) { + } + + clazz = clazz.getSuperclass(); + } while (field == null && !clazz.equals(Object.class)); + + if (field == null) + throw new IllegalArgumentException("Cannot find field '" + name + "' in the class hierarchy of " + + target.getClass()); + field.setAccessible(true); + field.set(target, value); + } + + @SuppressWarnings("unchecked") + public static T callMethod(String name, Object target, Object[] args, Class[] argsTypes) throws Exception { + Class clazz = target.getClass(); + Method method = ReflectionUtils.findMethod(clazz, name, argsTypes); + + if (method == null) + throw new IllegalArgumentException("Cannot find method '" + method + "' in the class hierarchy of " + + target.getClass()); + method.setAccessible(true); + return (T) ReflectionUtils.invokeMethod(method, target, args); + } + +} diff --git a/spring-statemachine-zookeeper/src/test/java/org/springframework/statemachine/zookeeper/ZookeeperStateMachineEnsembleTests.java b/spring-statemachine-zookeeper/src/test/java/org/springframework/statemachine/zookeeper/ZookeeperStateMachineEnsembleTests.java index 2600cec2..a1d9a3d9 100644 --- a/spring-statemachine-zookeeper/src/test/java/org/springframework/statemachine/zookeeper/ZookeeperStateMachineEnsembleTests.java +++ b/spring-statemachine-zookeeper/src/test/java/org/springframework/statemachine/zookeeper/ZookeeperStateMachineEnsembleTests.java @@ -37,6 +37,7 @@ import org.springframework.statemachine.StateMachine; import org.springframework.statemachine.StateMachineContext; import org.springframework.statemachine.access.StateMachineAccessor; import org.springframework.statemachine.ensemble.EnsembleListeger; +import org.springframework.statemachine.ensemble.StateMachineEnsembleException; import org.springframework.statemachine.listener.StateMachineListener; import org.springframework.statemachine.state.State; import org.springframework.statemachine.support.DefaultExtendedState; @@ -260,6 +261,58 @@ public class ZookeeperStateMachineEnsembleTests extends AbstractZookeeperTests { } } + @Test + public void testEventsOverflow() throws Exception { + context.register(ZkServerConfig.class, BaseConfig.class); + context.refresh(); + CuratorFramework curatorClient = + context.getBean("curatorClient", CuratorFramework.class); + OverflowControlZookeeperStateMachineEnsemble ensemble = + new OverflowControlZookeeperStateMachineEnsemble(curatorClient, "/foo", true, 4); + + TestEnsembleListener listener = new TestEnsembleListener(); + ensemble.addEnsembleListener(listener); + + ensemble.afterPropertiesSet(); + ensemble.start(); + listener.reset(0, 10, 1); + ensemble.enabled = false; + + for (int i = 0; i < 5; i++) { + ensemble.setState(new DefaultStateMachineContext("S" + i, "E" + i, + new HashMap(), new DefaultExtendedState())); + } + + ensemble.enabled = true; + TestUtils.callMethod("registerWatcherForStatePath", ensemble); + assertThat(listener.errors.size(), is(0)); + + for (int i = 5; i < 6; i++) { + ensemble.setState(new DefaultStateMachineContext("S" + i, "E" + i, + new HashMap(), new DefaultExtendedState())); + } + + assertThat(listener.errorLatch.await(2, TimeUnit.SECONDS), is(true)); + } + + private class OverflowControlZookeeperStateMachineEnsemble extends ZookeeperStateMachineEnsemble { + + boolean enabled = true; + + public OverflowControlZookeeperStateMachineEnsemble(CuratorFramework curatorClient, String basePath, + boolean cleanState, int logSize) { + super(curatorClient, basePath, cleanState, logSize); + } + + @Override + protected void registerWatcherForStatePath() { + if (enabled) { + super.registerWatcherForStatePath(); + } + } + + } + @Override protected AnnotationConfigApplicationContext buildContext() { return new AnnotationConfigApplicationContext(); @@ -269,6 +322,8 @@ public class ZookeeperStateMachineEnsembleTests extends AbstractZookeeperTests { volatile CountDownLatch joinedLatch = new CountDownLatch(1); volatile CountDownLatch eventLatch = new CountDownLatch(1); + volatile CountDownLatch errorLatch = new CountDownLatch(1); + volatile List errors = new ArrayList(); volatile List> events = new ArrayList>(); @Override @@ -286,10 +341,22 @@ public class ZookeeperStateMachineEnsembleTests extends AbstractZookeeperTests { eventLatch.countDown(); } + @Override + public void ensembleError(StateMachineEnsembleException exception) { + errors.add(exception); + errorLatch.countDown(); + } + public void reset(int c1, int c2) { + reset(c1, c2, 0); + } + + public void reset(int c1, int c2, int c3) { joinedLatch = new CountDownLatch(c1); eventLatch = new CountDownLatch(c2); + errorLatch = new CountDownLatch(c3); events.clear(); + errors.clear(); } }