The TaskScheduler for the MessageBus is now created in MessageBusParser if no explicit reference has been provided via the "task-executor" attribute of the <message-bus/> element. The configuration of an asynchronous ApplicationEventMulticaster has also been pushed to the parser rather than being contained within the MessageBus implementation.

This commit is contained in:
Mark Fisher
2008-10-09 13:45:07 +00:00
parent 977596272c
commit 49361dba5c
15 changed files with 161 additions and 146 deletions

View File

@@ -31,13 +31,15 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageHandlingException;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.scheduling.Schedulers;
import org.springframework.integration.scheduling.SimpleTaskScheduler;
import org.springframework.integration.scheduling.TaskScheduler;
/**
@@ -45,13 +47,17 @@ import org.springframework.integration.scheduling.TaskScheduler;
*/
public class AggregatorEndpointTests {
private final TaskScheduler taskScheduler = Schedulers.createDefaultTaskScheduler(10);
private TaskExecutor taskExecutor;
private TaskScheduler taskScheduler;
private AbstractMessageAggregator aggregator;
@Before
public void configureAggregator() {
this.taskExecutor = new SimpleAsyncTaskExecutor();
this.taskScheduler = new SimpleTaskScheduler(taskExecutor);
this.aggregator = new TestAggregator();
this.aggregator.setTaskScheduler(this.taskScheduler);
this.taskScheduler.start();
@@ -65,9 +71,9 @@ public class AggregatorEndpointTests {
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel);
Message<?> message3 = createMessage("789", "ABC", 3, 3, replyChannel);
CountDownLatch latch = new CountDownLatch(3);
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message1, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message2, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message3, latch));
this.taskExecutor.execute(new AggregatorTestTask(this.aggregator, message1, latch));
this.taskExecutor.execute(new AggregatorTestTask(this.aggregator, message2, latch));
this.taskExecutor.execute(new AggregatorTestTask(this.aggregator, message3, latch));
latch.await(1000, TimeUnit.MILLISECONDS);
Message<?> reply = replyChannel.receive(500);
assertNotNull(reply);
@@ -84,7 +90,7 @@ public class AggregatorEndpointTests {
Message<?> message = createMessage("123", "ABC", 2, 1, replyChannel);
CountDownLatch latch = new CountDownLatch(1);
AggregatorTestTask task = new AggregatorTestTask(this.aggregator, message, latch);
this.taskScheduler.execute(task);
this.taskExecutor.execute(task);
latch.await(2000, TimeUnit.MILLISECONDS);
assertEquals("task should have completed within timeout", 0, latch.getCount());
Message<?> reply = replyChannel.receive(0);
@@ -105,8 +111,8 @@ public class AggregatorEndpointTests {
CountDownLatch latch = new CountDownLatch(2);
AggregatorTestTask task1 = new AggregatorTestTask(this.aggregator, message1, latch);
AggregatorTestTask task2 = new AggregatorTestTask(this.aggregator, message2, latch);
this.taskScheduler.execute(task1);
this.taskScheduler.execute(task2);
this.taskExecutor.execute(task1);
this.taskExecutor.execute(task2);
latch.await(3000, TimeUnit.MILLISECONDS);
assertEquals("handlers should have been invoked within time limit", 0, latch.getCount());
Message<?> reply = replyChannel.receive(3000);
@@ -127,12 +133,12 @@ public class AggregatorEndpointTests {
Message<?> message5 = createMessage("def", "XYZ", 3, 2, replyChannel2);
Message<?> message6 = createMessage("ghi", "XYZ", 3, 3, replyChannel2);
CountDownLatch latch = new CountDownLatch(6);
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message1, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message6, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message2, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message5, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message3, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message4, latch));
this.taskExecutor.execute(new AggregatorTestTask(this.aggregator, message1, latch));
this.taskExecutor.execute(new AggregatorTestTask(this.aggregator, message6, latch));
this.taskExecutor.execute(new AggregatorTestTask(this.aggregator, message2, latch));
this.taskExecutor.execute(new AggregatorTestTask(this.aggregator, message5, latch));
this.taskExecutor.execute(new AggregatorTestTask(this.aggregator, message3, latch));
this.taskExecutor.execute(new AggregatorTestTask(this.aggregator, message4, latch));
latch.await(1000, TimeUnit.MILLISECONDS);
Message<?> reply1 = replyChannel1.receive(500);
assertNotNull(reply1);
@@ -202,10 +208,10 @@ public class AggregatorEndpointTests {
Message<?> message3 = createMessage("789", "ABC", 3, 3, replyChannel);
Message<?> message4 = createMessage("abc", "ABC", 3, 3, replyChannel);
CountDownLatch latch = new CountDownLatch(4);
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message1, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message2, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message3, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message4, latch));
this.taskExecutor.execute(new AggregatorTestTask(this.aggregator, message1, latch));
this.taskExecutor.execute(new AggregatorTestTask(this.aggregator, message2, latch));
this.taskExecutor.execute(new AggregatorTestTask(this.aggregator, message3, latch));
this.taskExecutor.execute(new AggregatorTestTask(this.aggregator, message4, latch));
latch.await(1000, TimeUnit.MILLISECONDS);
Message<?> reply = replyChannel.receive(500);
assertNotNull(reply);
@@ -222,11 +228,11 @@ public class AggregatorEndpointTests {
Message<?> message3 = createMessage("789", "ABC", 3, 3, replyChannel);
CountDownLatch latch = new CountDownLatch(3);
AggregatorTestTask task1 = new AggregatorTestTask(aggregator, message1, latch);
this.taskScheduler.execute(task1);
this.taskExecutor.execute(task1);
AggregatorTestTask task2 = new AggregatorTestTask(aggregator, message2, latch);
this.taskScheduler.execute(task2);
this.taskExecutor.execute(task2);
AggregatorTestTask task3 = new AggregatorTestTask(aggregator, message3, latch);
this.taskScheduler.execute(task3);
this.taskExecutor.execute(task3);
latch.await(1000, TimeUnit.MILLISECONDS);
assertNull(task1.getException());
assertNull(task2.getException());

View File

@@ -23,12 +23,13 @@ import static org.junit.Assert.assertNull;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.scheduling.Schedulers;
import org.springframework.integration.scheduling.TaskScheduler;
import org.springframework.integration.util.TestUtils;
/**
* @author Marius Bogoevici
@@ -43,7 +44,7 @@ public class ResequencerTests {
@Before
public void configureResequencer() {
this.resequencer = new Resequencer();
this.taskScheduler = Schedulers.createDefaultTaskScheduler(10);
this.taskScheduler = TestUtils.createTaskScheduler(10);
this.resequencer.setTaskScheduler(taskScheduler);
taskScheduler.start();
this.resequencer.start();

View File

@@ -4,7 +4,14 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<bean id="messageBus" class="org.springframework.integration.bus.DefaultMessageBus"/>
<bean id="messageBus" class="org.springframework.integration.bus.DefaultMessageBus">
<property name="taskScheduler">
<bean class="org.springframework.integration.util.TestUtils"
factory-method="createTaskScheduler">
<constructor-arg value="10"/>
</bean>
</property>
</bean>
<bean id="testChannel" class="org.springframework.integration.channel.QueueChannel"/>

View File

@@ -46,6 +46,7 @@ import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageSource;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.scheduling.IntervalTrigger;
import org.springframework.integration.util.TestUtils;
/**
* @author Mark Fisher
@@ -74,6 +75,7 @@ public class DefaultMessageBusTests {
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
context.refresh();
DefaultMessageBus bus = new DefaultMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
consumer.setChannelRegistry(bus);
bus.start();
@@ -86,6 +88,7 @@ public class DefaultMessageBusTests {
public void channelsWithoutHandlers() {
GenericApplicationContext context = new GenericApplicationContext();
DefaultMessageBus bus = new DefaultMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
QueueChannel sourceChannel = new QueueChannel();
sourceChannel.setBeanName("sourceChannel");
@@ -144,6 +147,7 @@ public class DefaultMessageBusTests {
context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1);
context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2);
DefaultMessageBus bus = new DefaultMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
bus.start();
inputChannel.send(new StringMessage("testing"));
@@ -187,6 +191,7 @@ public class DefaultMessageBusTests {
context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1);
context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2);
DefaultMessageBus bus = new DefaultMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
bus.start();
inputChannel.send(new StringMessage("testing"));
@@ -214,6 +219,7 @@ public class DefaultMessageBusTests {
channelAdapter.setBeanName("testChannel");
context.getBeanFactory().registerSingleton("testChannel", channelAdapter);
DefaultMessageBus bus = new DefaultMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
bus.start();
latch.await(2000, TimeUnit.MILLISECONDS);
@@ -257,6 +263,7 @@ public class DefaultMessageBusTests {
endpoint.afterPropertiesSet();
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
DefaultMessageBus bus = new DefaultMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
bus.start();
errorChannel.send(new ErrorMessage(new RuntimeException("test-exception")));

View File

@@ -4,7 +4,14 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="bus" class="org.springframework.integration.bus.DefaultMessageBus"/>
<bean id="bus" class="org.springframework.integration.bus.DefaultMessageBus">
<property name="taskScheduler">
<bean class="org.springframework.integration.util.TestUtils"
factory-method="createTaskScheduler">
<constructor-arg value="10"/>
</bean>
</property>
</bean>
<bean id="sourceChannel" class="org.springframework.integration.channel.QueueChannel"/>

View File

@@ -37,8 +37,8 @@ import org.springframework.integration.bus.MessageBusInterceptorTests;
import org.springframework.integration.bus.TestMessageBusAwareImpl;
import org.springframework.integration.bus.TestMessageBusStartInterceptor;
import org.springframework.integration.bus.TestMessageBusStopInterceptor;
import org.springframework.integration.scheduling.SimpleTaskScheduler;
import org.springframework.integration.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* @author Mark Fisher
@@ -144,7 +144,7 @@ public class MessageBusParserTests {
context.getBean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME);
DirectFieldAccessor accessor = new DirectFieldAccessor(multicaster);
Object taskExecutor = accessor.getPropertyValue("taskExecutor");
assertEquals(SimpleTaskScheduler.class, taskExecutor.getClass());
assertEquals(ThreadPoolTaskExecutor.class, taskExecutor.getClass());
}
@Test

View File

@@ -52,6 +52,7 @@ import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.scheduling.IntervalTrigger;
import org.springframework.integration.scheduling.Trigger;
import org.springframework.integration.util.TestUtils;
/**
* @author Mark Fisher
@@ -144,6 +145,7 @@ public class MessagingAnnotationPostProcessorTests {
DefaultMessageBus messageBus = new DefaultMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
@@ -209,6 +211,7 @@ public class MessagingAnnotationPostProcessorTests {
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
@@ -235,6 +238,7 @@ public class MessagingAnnotationPostProcessorTests {
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
@@ -259,6 +263,7 @@ public class MessagingAnnotationPostProcessorTests {
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
@@ -285,6 +290,7 @@ public class MessagingAnnotationPostProcessorTests {
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
@@ -309,6 +315,7 @@ public class MessagingAnnotationPostProcessorTests {
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
@@ -333,6 +340,7 @@ public class MessagingAnnotationPostProcessorTests {
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
@@ -378,6 +386,7 @@ public class MessagingAnnotationPostProcessorTests {
DefaultMessageBus messageBus = new DefaultMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
@@ -412,6 +421,7 @@ public class MessagingAnnotationPostProcessorTests {
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
DefaultMessageBus messageBus = new DefaultMessageBus();
context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());

View File

@@ -16,7 +16,12 @@
package org.springframework.integration.util;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.integration.scheduling.SimpleTaskScheduler;
import org.springframework.integration.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
/**
@@ -51,4 +56,12 @@ public abstract class TestUtils {
return (T) value;
}
public static TaskScheduler createTaskScheduler(int poolSize) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(poolSize);
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
executor.afterPropertiesSet();
return new SimpleTaskScheduler(executor);
}
}