The Message Bus no longer creates a default TaskScheduler. However, a default thread-pool version is created in the MessageBusParser if no "task-scheduler" reference is provided.

This commit is contained in:
Mark Fisher
2008-10-09 16:03:10 +00:00
parent b8e03f05c0
commit b6d22a7644
10 changed files with 26 additions and 11 deletions

View File

@@ -36,6 +36,7 @@ import org.springframework.integration.endpoint.SubscribingConsumerEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessagingException;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.util.TestUtils;
/**
* @author Mark Fisher
@@ -55,6 +56,7 @@ public class DirectChannelSubscriptionTests {
targetChannel.setBeanName("targetChannel");
bus.registerChannel(sourceChannel);
bus.registerChannel(targetChannel);
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
}

View File

@@ -24,6 +24,7 @@ import org.junit.Test;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.util.TestUtils;
/**
* @author Marius Bogoevici
@@ -33,6 +34,7 @@ public class MessageBusInterceptorTests {
@Test
public void testStart() {
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(new GenericApplicationContext());
TestMessageBusStartInterceptor startInterceptor = new TestMessageBusStartInterceptor();
TestMessageBusStopInterceptor stopInterceptor = new TestMessageBusStopInterceptor();

View File

@@ -36,6 +36,7 @@ import org.springframework.integration.endpoint.PollingConsumerEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.util.TestUtils;
/**
* @author Mark Fisher
@@ -60,6 +61,7 @@ public class MessageChannelTemplateTests {
context.getBeanFactory().registerSingleton("requestChannel", requestChannel);
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
DefaultMessageBus bus = new DefaultMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
bus.start();
}

View File

@@ -33,6 +33,7 @@ import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.util.TestUtils;
/**
* @author Mark Fisher
@@ -48,7 +49,9 @@ public class ServiceActivatorAnnotationPostProcessorTests {
beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(latch);
context.registerBeanDefinition("testBean", beanDefinition);
String busBeanName = MessageBusParser.MESSAGE_BUS_BEAN_NAME;
context.registerBeanDefinition(busBeanName, new RootBeanDefinition(DefaultMessageBus.class));
RootBeanDefinition busBeanDefinition = new RootBeanDefinition(DefaultMessageBus.class);
busBeanDefinition.getPropertyValues().addPropertyValue("taskScheduler", TestUtils.createTaskScheduler(10));
context.registerBeanDefinition(busBeanName, busBeanDefinition);
RootBeanDefinition postProcessorDef = new RootBeanDefinition(MessagingAnnotationPostProcessor.class);
context.registerBeanDefinition("postProcessor", postProcessorDef);
context.refresh();

View File

@@ -30,6 +30,7 @@ import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.MessageBusParser;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.util.TestUtils;
/**
* @author Mark Fisher
@@ -48,6 +49,7 @@ public class RouterAnnotationPostProcessorTests {
@Before
public void init() {
messageBus.setApplicationContext(context);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
inputChannel.setBeanName("input");
outputChannel.setBeanName("output");
context.getBeanFactory().registerSingleton("input", inputChannel);

View File

@@ -32,6 +32,7 @@ import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.MessageBusParser;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.util.TestUtils;
/**
* @author Mark Fisher
@@ -55,6 +56,7 @@ public class SplitterAnnotationPostProcessorTests {
context.getBeanFactory().registerSingleton("output", outputChannel);
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
}

View File

@@ -7,7 +7,7 @@
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">
<beans:bean id="messageBus" class="org.springframework.integration.bus.DefaultMessageBus"/>
<message-bus/>
<channel id="testChannel">
<queue capacity="50"/>

View File

@@ -31,6 +31,7 @@ import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.endpoint.PollingConsumerEndpoint;
import org.springframework.integration.endpoint.ServiceActivatorEndpoint;
import org.springframework.integration.util.TestUtils;
/**
* @author Mark Fisher
@@ -87,6 +88,7 @@ public class MethodInvokingConsumerTests {
PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(serivceActivator, channel);
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
DefaultMessageBus bus = new DefaultMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
bus.start();
String result = queue.poll(1000, TimeUnit.MILLISECONDS);