diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java index c388a5b7bb..c14dff1df6 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java @@ -16,7 +16,7 @@ package org.springframework.integration.channel; -import org.springframework.integration.dispatcher.AbstractWinningHandlerDispatcher; +import org.springframework.integration.dispatcher.AbstractHandleOnceDispatcher; import org.springframework.integration.dispatcher.RoundRobinDispatcher; /** @@ -27,13 +27,13 @@ import org.springframework.integration.dispatcher.RoundRobinDispatcher; * @author Mark Fisher * @author Iwein Fuld */ -public class DirectChannel extends AbstractSubscribableChannel { +public class DirectChannel extends AbstractSubscribableChannel { public DirectChannel() { super(new RoundRobinDispatcher()); } - public DirectChannel(AbstractWinningHandlerDispatcher dispatcher){ + public DirectChannel(AbstractHandleOnceDispatcher dispatcher){ super(dispatcher); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractWinningHandlerDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractHandleOnceDispatcher.java similarity index 63% rename from org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractWinningHandlerDispatcher.java rename to org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractHandleOnceDispatcher.java index f49b1cc9e6..038662db8d 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractWinningHandlerDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractHandleOnceDispatcher.java @@ -1,6 +1,6 @@ package org.springframework.integration.dispatcher; -import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import org.springframework.integration.core.Message; @@ -20,28 +20,25 @@ import org.springframework.integration.message.MessageRejectedException; * MessageRejectedException. *

* The implementations of this class control the order in which the handlers - * will be tried through the getNextHandler method. + * will be tried through the implementation of the {@link #getHandlerIterator(List)} method. * * @author Iwein Fuld * */ -public abstract class AbstractWinningHandlerDispatcher extends AbstractDispatcher { +public abstract class AbstractHandleOnceDispatcher extends AbstractDispatcher { public final boolean dispatch(Message message) { boolean success = false; - List triedHandlers = new ArrayList(); - int size; List handlers = this.getHandlers(); if (handlers.isEmpty()) { throw new MessageDeliveryException(message, "Dispatcher has no subscribers."); } - size = handlers.size(); - for (int i = 0; triedHandlers.size() < size && success == false; i++) { - MessageHandler handler = handlers.get(getNextHandlerIndex(size, i)); + Iterator handlerIterator = getHandlerIterator(handlers); + while (success == false && handlerIterator.hasNext()) { + MessageHandler handler = handlerIterator.next(); if (this.sendMessageToHandler(message, handler)) { - success = true; //we have a winner. + success = true; // we have a winner. } - triedHandlers.add(handler); } if (!success) { throw new MessageRejectedException(message, "All of dispatcher's subscribers rejected Message."); @@ -50,12 +47,11 @@ public abstract class AbstractWinningHandlerDispatcher extends AbstractDispatche } /** - * Return the next handler index. Subclasses have to implement this method - * to determine the order in which handlers will be invoked. - * - * @param size the total number of handlers - * @param loopIndex the current index of the loop + * Return the iterator that will be used to loop over the handlers. This + * allows subclasses to control the order of iteration for each + * {@link #dispatch(Message)} invocation. + * @param handlers + * @return */ - protected abstract int getNextHandlerIndex(int size, int loopIndex); - + protected abstract Iterator getHandlerIterator(List handlers); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/FailOverDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/FailOverDispatcher.java index 9800fdd030..c82a0da634 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/FailOverDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/FailOverDispatcher.java @@ -16,22 +16,24 @@ package org.springframework.integration.dispatcher; +import java.util.Iterator; +import java.util.List; + +import org.springframework.integration.message.MessageHandler; + /** - * {@link AbstractWinningHandlerDispatcher} that will try it's handlers in the + * {@link AbstractHandleOnceDispatcher} that will try it's handlers in the * same order every dispatch. * * @author Mark Fisher * @author Iwein Fuld */ -public class FailOverDispatcher extends AbstractWinningHandlerDispatcher { +public class FailOverDispatcher extends AbstractHandleOnceDispatcher { + - /** - * Returns the current loop index, so each dispatch will try - * the handlers in the same order. - */ @Override - protected int getNextHandlerIndex(int size, int loopIndex) { - return loopIndex; + protected Iterator getHandlerIterator(List handlers) { + return handlers.iterator(); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/RoundRobinDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/RoundRobinDispatcher.java index 2c203e2982..9db155e0ca 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/RoundRobinDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/RoundRobinDispatcher.java @@ -15,25 +15,46 @@ */ package org.springframework.integration.dispatcher; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import org.springframework.integration.message.MessageHandler; + /** - * Round-robin implementation of {@link AbstractWinningHandlerDispatcher}. This - * implementation will keep track of the index in which its handlers have been - * tried and return a different handler every dispatch. + * Round-robin implementation of {@link AbstractHandleOnceDispatcher}. This + * implementation will keep track of the index of the handler that has been + * tried first and use a different starting handler every dispatch. * * @author Iwein Fuld */ -public class RoundRobinDispatcher extends AbstractWinningHandlerDispatcher { +public class RoundRobinDispatcher extends AbstractHandleOnceDispatcher { private AtomicInteger currentHandlerIndex = new AtomicInteger(); /** - * Keeps track of the last index over multiple dispatch - * invocations. Each invocation of this method will increment the index by - * one, overflowing at size. loopIndex is ignored. + * Returns an iterator that starts at a new point in the list every time the + * first part of the list that is skipped will be used at the end of the + * iteration, so it guarantees all handlers are returned once on subsequent + * next() invocations. */ - protected int getNextHandlerIndex(int size, int loopIndex) { + @Override + protected Iterator getHandlerIterator(List handlers) { + int size = handlers.size(); + int nextHandlerStartIndex = getNextHandlerStartIndex(size); + List reorderedHandlers = new ArrayList(handlers.subList(nextHandlerStartIndex, + size)); + reorderedHandlers.addAll(handlers.subList(0, nextHandlerStartIndex)); + return reorderedHandlers.iterator(); + } + + /** + * Keeps track of the last index over multiple dispatches. Each invocation + * of this method will increment the index by one, overflowing at + * size. + */ + private int getNextHandlerStartIndex(int size) { int indexTail = currentHandlerIndex.getAndIncrement() % size; return indexTail < 0 ? indexTail + size : indexTail; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageHandler.java index 246e2de8c2..9a2f82d8c2 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageHandler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageHandler.java @@ -22,9 +22,26 @@ import org.springframework.integration.core.Message; * Base interface for any component that handles Messages. * * @author Mark Fisher + * @author Iwein Fuld */ public interface MessageHandler { - void handleMessage(Message message); + /** + * Handles the message if possible. If the handler cannot deal with the + * message this will result in a MessageRejectedException e.g. + * in case of a Selective Consumer. When a consumer tries to handle a + * message, but fails to do so, a MessageDeliveryException is + * thrown. In the first case a caller can decide to try other consumers, in + * the second case it is recommended to treat the message as tainted and go + * into an error scenario. + * + * @param message the message to be handled + * + * @throws MessageRejectedException if the handler doesn't select these + * types of messages + * @throws MessageDeliveryException when something went wrong during the + * handling + */ + void handleMessage(Message message) throws MessageRejectedException, MessageDeliveryException; } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/RoundRobinDispatcherConcurrentTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/RoundRobinDispatcherConcurrentTests.java index 993f96b496..1da474aee1 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/RoundRobinDispatcherConcurrentTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/RoundRobinDispatcherConcurrentTests.java @@ -1,6 +1,7 @@ package org.springframework.integration.dispatcher; import static org.junit.Assert.*; +import static org.mockito.Mockito.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -13,6 +14,7 @@ import org.mockito.runners.MockitoJUnit44Runner; import org.springframework.integration.core.Message; import org.springframework.integration.message.MessageDeliveryException; import org.springframework.integration.message.MessageHandler; +import org.springframework.integration.message.MessageRejectedException; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @RunWith(MockitoJUnit44Runner.class) @@ -20,7 +22,7 @@ public class RoundRobinDispatcherConcurrentTests { private static final int TOTAL_EXECUTIONS = 40; - private AbstractWinningHandlerDispatcher dispatcher = new RoundRobinDispatcher(); + private AbstractHandleOnceDispatcher dispatcher = new RoundRobinDispatcher(); private ThreadPoolTaskExecutor scheduler = new ThreadPoolTaskExecutor(); @@ -76,9 +78,13 @@ public class RoundRobinDispatcherConcurrentTests { start.countDown(); allDone.await(); assertFalse("not all messages were accepted", failed.get()); + verify(handler1, times(TOTAL_EXECUTIONS / 4)).handleMessage(message); + verify(handler2, times(TOTAL_EXECUTIONS / 4)).handleMessage(message); + verify(handler3, times(TOTAL_EXECUTIONS / 4)).handleMessage(message); + verify(handler4, times(TOTAL_EXECUTIONS / 4)).handleMessage(message); } - @Test(timeout=2000) + @Test(timeout = 2000) public void unlockOnFailure() throws Exception { // dispatcher has no subscribers (shouldn't lead to deadlock) final CountDownLatch start = new CountDownLatch(1); @@ -97,7 +103,7 @@ public class RoundRobinDispatcherConcurrentTests { fail("this shouldn't happen"); } catch (MessageDeliveryException e) { - //expected + // expected } allDone.countDown(); } @@ -108,4 +114,39 @@ public class RoundRobinDispatcherConcurrentTests { start.countDown(); allDone.await(); } + + @Test + public void noHandlerSkipUnderConcurrentFailure() throws Exception { + dispatcher.addHandler(handler1); + dispatcher.addHandler(handler2); + doThrow(new MessageRejectedException(message)).when(handler1).handleMessage(message); + final CountDownLatch start = new CountDownLatch(1); + final CountDownLatch allDone = new CountDownLatch(TOTAL_EXECUTIONS); + final Message message = this.message; + final AtomicBoolean failed = new AtomicBoolean(false); + Runnable messageSenderTask = new Runnable() { + public void run() { + try { + start.await(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (!dispatcher.dispatch(message)) { + failed.set(true); + } + else { + allDone.countDown(); + } + } + }; + for (int i = 0; i < TOTAL_EXECUTIONS; i++) { + scheduler.execute(messageSenderTask); + } + start.countDown(); + allDone.await(); + assertFalse("not all messages were accepted", failed.get()); + verify(handler1, times(TOTAL_EXECUTIONS/2)).handleMessage(message); + verify(handler2, times(TOTAL_EXECUTIONS)).handleMessage(message); + } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/RoundRobinDispatcherTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/RoundRobinDispatcherTests.java index 39bda7b6c9..257e1a8e66 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/RoundRobinDispatcherTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/RoundRobinDispatcherTests.java @@ -35,7 +35,7 @@ import org.springframework.integration.message.MessageHandler; @RunWith(MockitoJUnit44Runner.class) public class RoundRobinDispatcherTests { - private AbstractWinningHandlerDispatcher dispatcher = new RoundRobinDispatcher(); + private AbstractHandleOnceDispatcher dispatcher = new RoundRobinDispatcher(); @Mock private MessageHandler handler;