dispatcher tests added
This commit is contained in:
@@ -16,43 +16,165 @@
|
||||
|
||||
package org.springframework.integration.dispatcher;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.easymock.EasyMock.*;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.easymock.IAnswer;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.integration.endpoint.HandlerEndpoint;
|
||||
import org.springframework.integration.handler.MessageHandler;
|
||||
import org.springframework.integration.handler.TestHandlers;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.MessageTarget;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
* @author Iwein Fuld
|
||||
*/
|
||||
public class BroadcastingDispatcherTests {
|
||||
|
||||
private BroadcastingDispatcher dispatcher;
|
||||
|
||||
private TaskExecutor taskExecutorMock = createMock(TaskExecutor.class);
|
||||
|
||||
private Message<?> messageMock = createMock(Message.class);
|
||||
|
||||
private MessageTarget targetMock = createMock(MessageTarget.class);
|
||||
|
||||
private Object[] globalMocks = new Object[] { messageMock, taskExecutorMock, targetMock };
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
dispatcher = new BroadcastingDispatcher();
|
||||
dispatcher.setTaskExecutor(taskExecutorMock);
|
||||
reset(globalMocks);
|
||||
defaultTaskExecutorMock();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testPublishSubscribe() throws InterruptedException {
|
||||
BroadcastingDispatcher dispatcher = new BroadcastingDispatcher();
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
final AtomicInteger counter1 = new AtomicInteger();
|
||||
final AtomicInteger counter2 = new AtomicInteger();
|
||||
dispatcher.addTarget(createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch)));
|
||||
dispatcher.addTarget(createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch)));
|
||||
dispatcher.send(new StringMessage("test"));
|
||||
latch.await(500, TimeUnit.MILLISECONDS);
|
||||
assertEquals(0, latch.getCount());
|
||||
assertEquals(1, counter1.get());
|
||||
assertEquals(1, counter2.get());
|
||||
public void publishSubcribe() throws Exception {
|
||||
dispatcher.setTaskExecutor(null);
|
||||
dispatcher.addTarget(targetMock);
|
||||
dispatcher.addTarget(targetMock);
|
||||
expect(targetMock.send(messageMock)).andReturn(true).times(2);
|
||||
replay(globalMocks);
|
||||
dispatcher.send(messageMock);
|
||||
verify(globalMocks);
|
||||
}
|
||||
|
||||
|
||||
private static MessageTarget createEndpoint(MessageHandler handler) {
|
||||
return new HandlerEndpoint(handler);
|
||||
@Test
|
||||
public void multipleTargetsWithExecutor() {
|
||||
// should the same target be allowed to be added twice?
|
||||
dispatcher.addTarget(targetMock);
|
||||
dispatcher.addTarget(targetMock);
|
||||
dispatcher.addTarget(targetMock);
|
||||
expect(targetMock.send(messageMock)).andReturn(true).times(3);
|
||||
replay(globalMocks);
|
||||
dispatcher.send(messageMock);
|
||||
verify(globalMocks);
|
||||
}
|
||||
|
||||
}
|
||||
@Test
|
||||
public void multipleTargetsPartialFailure() {
|
||||
reset(taskExecutorMock);
|
||||
dispatcher.addTarget(targetMock);
|
||||
dispatcher.addTarget(targetMock);
|
||||
dispatcher.addTarget(targetMock);
|
||||
partialFailingExecutorMock(true, false, true);
|
||||
expect(targetMock.send(messageMock)).andReturn(true).times(2);
|
||||
replay(globalMocks);
|
||||
dispatcher.send(messageMock);
|
||||
verify(globalMocks);
|
||||
}
|
||||
|
||||
@Test(timeout = 100)
|
||||
public void multipleTargetsPartialTimout() throws Exception {
|
||||
reset(taskExecutorMock);
|
||||
dispatcher.addTarget(targetMock);
|
||||
dispatcher.addTarget(targetMock);
|
||||
dispatcher.addTarget(targetMock);
|
||||
dispatcher.setTimeout(50);
|
||||
// three threads invoking targets
|
||||
final CountDownLatch latch = new CountDownLatch(3);
|
||||
threadedExecutorMock(3);
|
||||
final AtomicBoolean timingOutStarted = new AtomicBoolean(false);
|
||||
final AtomicBoolean testNotTimedOut = new AtomicBoolean(false);
|
||||
|
||||
expect(targetMock.send(messageMock)).andAnswer(new IAnswer<Boolean>() {
|
||||
public Boolean answer() throws Throwable {
|
||||
latch.countDown();
|
||||
return true;
|
||||
}
|
||||
}).times(2);
|
||||
/*
|
||||
* Watch out, this is tricky. The send() method will be invoked but due
|
||||
* to the faked time out it will never return. Therefore the expectation
|
||||
* needs to be there, but during the verify it will be called 0 times.
|
||||
* This is something that EasyMock doesn't support so I've worked around
|
||||
* it with an AtomicBoolean and a latch. It isn't pretty, but it sort of works
|
||||
*/
|
||||
expect(targetMock.send(messageMock)).andAnswer(new IAnswer<Boolean>() {
|
||||
public Boolean answer() throws Throwable {
|
||||
// this should happen
|
||||
timingOutStarted.compareAndSet(false,true);
|
||||
latch.countDown();
|
||||
// cause timeout here
|
||||
Thread.sleep(1000);
|
||||
testNotTimedOut.compareAndSet(false, true);
|
||||
//fail("There is a bug in this Test");
|
||||
//in a long running suite this will run until the end, but the test will already be over
|
||||
return null;
|
||||
}
|
||||
}).anyTimes();
|
||||
replay(globalMocks);
|
||||
dispatcher.send(messageMock);
|
||||
latch.await();
|
||||
verify(globalMocks);
|
||||
assertFalse("Test not timed out properly", testNotTimedOut.get());
|
||||
assertTrue("Timing out Runnable not executed", timingOutStarted.get());
|
||||
}
|
||||
|
||||
private void defaultTaskExecutorMock() {
|
||||
taskExecutorMock.execute(isA(Runnable.class));
|
||||
expectLastCall().andAnswer(new IAnswer<Object>() {
|
||||
public Object answer() throws Throwable {
|
||||
((Runnable) getCurrentArguments()[0]).run();
|
||||
return null;
|
||||
}
|
||||
}).anyTimes();
|
||||
}
|
||||
|
||||
/*
|
||||
* runs the runnable based on the array of passes
|
||||
*/
|
||||
private void partialFailingExecutorMock(boolean... passes) {
|
||||
taskExecutorMock.execute(isA(Runnable.class));
|
||||
for (final boolean pass : passes)
|
||||
expectLastCall().andAnswer(new IAnswer<Object>() {
|
||||
public Object answer() throws Throwable {
|
||||
if (pass) {
|
||||
((Runnable) getCurrentArguments()[0]).run();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* expect count calls to the taskExecutorMock.execute and have them run the runnable
|
||||
* in a new Thread.
|
||||
*/
|
||||
private void threadedExecutorMock(int count) {
|
||||
taskExecutorMock.execute(isA(Runnable.class));
|
||||
expectLastCall().andAnswer(new IAnswer<Object>() {
|
||||
public Object answer() throws Throwable {
|
||||
final Runnable runnable = (Runnable) getCurrentArguments()[0];
|
||||
new Thread(runnable).start();
|
||||
return null;
|
||||
}
|
||||
}).times(count);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,111 @@
|
||||
package org.springframework.integration.dispatcher;
|
||||
|
||||
import static org.easymock.EasyMock.createMock;
|
||||
import static org.easymock.EasyMock.expect;
|
||||
import static org.easymock.EasyMock.replay;
|
||||
import static org.easymock.EasyMock.reset;
|
||||
import static org.easymock.EasyMock.verify;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.integration.message.BlockingSource;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.scheduling.Schedule;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Iwein Fuld
|
||||
*
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public class PollingDispatcherTest {
|
||||
|
||||
private PollingDispatcher pollingDispatcher;
|
||||
private Schedule scheduleMock = createMock(Schedule.class);
|
||||
private MessageDispatcher dispatcherMock = createMock(MessageDispatcher.class);
|
||||
private BlockingSource sourceMock = createMock(BlockingSource.class);
|
||||
private Message messageMock = createMock(Message.class);
|
||||
private Object[] globalMocks = new Object[] { scheduleMock, dispatcherMock,
|
||||
sourceMock, messageMock };
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
pollingDispatcher = new PollingDispatcher(sourceMock, dispatcherMock,
|
||||
scheduleMock);
|
||||
pollingDispatcher.setReceiveTimeout(-1);
|
||||
reset(globalMocks);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void singleMessage() {
|
||||
expect(sourceMock.receive()).andReturn(messageMock);
|
||||
expect(dispatcherMock.send(messageMock)).andReturn(true);
|
||||
replay(globalMocks);
|
||||
pollingDispatcher.run();
|
||||
verify(globalMocks);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void multipleMessages() {
|
||||
expect(sourceMock.receive()).andReturn(messageMock).times(5);
|
||||
expect(dispatcherMock.send(messageMock)).andReturn(true).times(5);
|
||||
replay(globalMocks);
|
||||
pollingDispatcher.setMaxMessagesPerTask(5);
|
||||
pollingDispatcher.run();
|
||||
verify(globalMocks);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void multipleMessages_underrun() {
|
||||
expect(sourceMock.receive()).andReturn(messageMock).times(5);
|
||||
expect(sourceMock.receive()).andReturn(null);
|
||||
expect(dispatcherMock.send(messageMock)).andReturn(true).times(5);
|
||||
replay(globalMocks);
|
||||
pollingDispatcher.setMaxMessagesPerTask(6);
|
||||
pollingDispatcher.run();
|
||||
verify(globalMocks);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void droppedMessage() {
|
||||
expect(sourceMock.receive()).andReturn(messageMock);
|
||||
expect(dispatcherMock.send(messageMock)).andReturn(false);
|
||||
replay(globalMocks);
|
||||
pollingDispatcher.run();
|
||||
verify(globalMocks);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void droppedMessage_onePerPoll() {
|
||||
expect(sourceMock.receive()).andReturn(messageMock).times(1);
|
||||
expect(dispatcherMock.send(messageMock)).andReturn(false).anyTimes();
|
||||
replay(globalMocks);
|
||||
pollingDispatcher.setMaxMessagesPerTask(10);
|
||||
pollingDispatcher.run();
|
||||
verify(globalMocks);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void blockingSourceTimedOut() {
|
||||
pollingDispatcher = new PollingDispatcher(sourceMock, dispatcherMock,
|
||||
scheduleMock);
|
||||
// we don't need to await the timeout, returning null suffices
|
||||
expect(sourceMock.receive(1)).andReturn(null);
|
||||
replay(globalMocks);
|
||||
pollingDispatcher.setReceiveTimeout(1);
|
||||
pollingDispatcher.run();
|
||||
verify(globalMocks);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void blockingSourceNotTimedOut() {
|
||||
pollingDispatcher = new PollingDispatcher(sourceMock, dispatcherMock,
|
||||
scheduleMock);
|
||||
expect(sourceMock.receive(1)).andReturn(messageMock);
|
||||
expect(dispatcherMock.send(messageMock)).andReturn(false);
|
||||
replay(globalMocks);
|
||||
pollingDispatcher.setReceiveTimeout(1);
|
||||
pollingDispatcher.run();
|
||||
verify(globalMocks);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user