MessageBus is now an interface. The DefaultMessageBus class is the implementation.

This commit is contained in:
Mark Fisher
2008-07-06 22:09:07 +00:00
parent bfd3d5392b
commit 2afcf4c490
21 changed files with 61 additions and 56 deletions

View File

@@ -4,7 +4,7 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<bean id="messageBus" class="org.springframework.integration.bus.MessageBus"/>
<bean id="messageBus" class="org.springframework.integration.bus.DefaultMessageBus"/>
<bean id="testChannel" class="org.springframework.integration.channel.QueueChannel"/>

View File

@@ -43,11 +43,11 @@ import org.springframework.integration.scheduling.PollingSchedule;
/**
* @author Mark Fisher
*/
public class MessageBusTests {
public class DefaultMessageBusTests {
@Test
public void testRegistrationWithInputChannelReference() {
MessageBus bus = new MessageBus();
DefaultMessageBus bus = new DefaultMessageBus();
MessageChannel sourceChannel = new QueueChannel();
MessageChannel targetChannel = new QueueChannel();
bus.registerChannel("sourceChannel", sourceChannel);
@@ -69,7 +69,7 @@ public class MessageBusTests {
@Test
public void testRegistrationWithInputChannelName() {
MessageBus bus = new MessageBus();
MessageBus bus = new DefaultMessageBus();
MessageChannel sourceChannel = new QueueChannel();
MessageChannel targetChannel = new QueueChannel();
bus.registerChannel("sourceChannel", sourceChannel);
@@ -91,7 +91,7 @@ public class MessageBusTests {
@Test
public void testChannelsWithoutHandlers() {
MessageBus bus = new MessageBus();
MessageBus bus = new DefaultMessageBus();
MessageChannel sourceChannel = new QueueChannel();
sourceChannel.send(new StringMessage("123", "test"));
MessageChannel targetChannel = new QueueChannel();
@@ -133,7 +133,7 @@ public class MessageBusTests {
return message;
}
};
MessageBus bus = new MessageBus();
MessageBus bus = new DefaultMessageBus();
bus.registerChannel("input", inputChannel);
bus.registerChannel("output1", outputChannel1);
bus.registerChannel("output2", outputChannel2);
@@ -167,7 +167,7 @@ public class MessageBusTests {
return message;
}
};
MessageBus bus = new MessageBus();
MessageBus bus = new DefaultMessageBus();
bus.registerChannel("input", inputChannel);
bus.registerChannel("output1", outputChannel1);
bus.registerChannel("output2", outputChannel2);
@@ -186,7 +186,7 @@ public class MessageBusTests {
@Test
public void testErrorChannelWithFailedDispatch() throws InterruptedException {
MessageBus bus = new MessageBus();
MessageBus bus = new DefaultMessageBus();
CountDownLatch latch = new CountDownLatch(1);
SourceEndpoint sourceEndpoint = new SourceEndpoint(new FailingSource(latch));
sourceEndpoint.setOutputChannel(new QueueChannel());
@@ -210,7 +210,7 @@ public class MessageBusTests {
@Test
public void testErrorChannelRegistration() {
MessageChannel errorChannel = new QueueChannel();
MessageBus bus = new MessageBus();
DefaultMessageBus bus = new DefaultMessageBus();
bus.setErrorChannel(errorChannel);
assertEquals(errorChannel, bus.getErrorChannel());
}
@@ -218,7 +218,7 @@ public class MessageBusTests {
@Test
public void testHandlerSubscribedToErrorChannel() throws InterruptedException {
MessageChannel errorChannel = new QueueChannel();
MessageBus bus = new MessageBus();
DefaultMessageBus bus = new DefaultMessageBus();
bus.setErrorChannel(errorChannel);
final CountDownLatch latch = new CountDownLatch(1);
MessageHandler handler = new MessageHandler() {
@@ -241,6 +241,7 @@ public class MessageBusTests {
assertTrue(messageBusAwareBean.getMessageBus() == context.getBean("bus"));
}
private static class FailingSource implements MessageSource<Object> {
private CountDownLatch latch;

View File

@@ -39,7 +39,7 @@ import org.springframework.integration.message.StringMessage;
*/
public class DirectChannelSubscriptionTests {
private MessageBus bus = new MessageBus();
private DefaultMessageBus bus = new DefaultMessageBus();
private MessageChannel sourceChannel = new DirectChannel();

View File

@@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.bus.MessageBus;
/**
@@ -30,7 +31,7 @@ public class MessageBusInterceptorTests {
@Test
public void testStart() {
MessageBus messageBus = new MessageBus();
DefaultMessageBus messageBus = new DefaultMessageBus();
TestMessageBusStartInterceptor startInterceptor = new TestMessageBusStartInterceptor();
TestMessageBusStopInterceptor stopInterceptor = new TestMessageBusStopInterceptor();
// add all interceptors

View File

@@ -4,7 +4,7 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="bus" class="org.springframework.integration.bus.MessageBus"/>
<bean id="bus" class="org.springframework.integration.bus.DefaultMessageBus"/>
<bean id="sourceChannel" class="org.springframework.integration.channel.QueueChannel"/>

View File

@@ -32,7 +32,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.context.support.StaticApplicationContext;
import org.springframework.integration.bus.DefaultChannelFactoryBean;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.ChannelInterceptor;
import org.springframework.integration.channel.DispatcherPolicy;
@@ -107,11 +107,11 @@ public class ChannelFactoryTests {
@Test
public void testDefaultChannelFactoryBean() throws Exception{
MessageBus messageBus = new MessageBus();
DefaultMessageBus messageBus = new DefaultMessageBus();
ChannelFactory channelFactory = new StubChannelFactory();
messageBus.setChannelFactory(channelFactory);
StaticApplicationContext applicationContext = new StaticApplicationContext();
BeanDefinitionBuilder messageBusDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(MessageBus.class);
BeanDefinitionBuilder messageBusDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(DefaultMessageBus.class);
messageBusDefinitionBuilder.getBeanDefinition().getPropertyValues().addPropertyValue("channelFactory", channelFactory);
applicationContext.registerBeanDefinition("messageBus", messageBusDefinitionBuilder.getBeanDefinition());
DefaultChannelFactoryBean channelFactoryBean = new DefaultChannelFactoryBean(dispatcherPolicy);

View File

@@ -31,6 +31,7 @@ import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.integration.ConfigurationException;
import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.bus.TestMessageBusAwareImpl;
import org.springframework.integration.bus.interceptor.MessageBusInterceptorTests;
@@ -51,7 +52,7 @@ public class MessageBusParserTests {
public void testErrorChannelReference() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"messageBusWithErrorChannelReference.xml", this.getClass());
MessageBus bus = (MessageBus) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
DefaultMessageBus bus = (DefaultMessageBus) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
bus.initialize();
assertEquals(context.getBean("testErrorChannel"), bus.getErrorChannel());
}
@@ -60,7 +61,7 @@ public class MessageBusParserTests {
public void testDefaultErrorChannel() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"messageBusWithDefaults.xml", this.getClass());
MessageBus bus = (MessageBus) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
DefaultMessageBus bus = (DefaultMessageBus) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
bus.initialize();
assertNotNull("bus should have created a default error channel", bus.getErrorChannel());
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2007 the original author or authors.
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -32,7 +32,7 @@ import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.annotation.Subscriber;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.annotation.SubscriberAnnotationPostProcessor;
@@ -52,7 +52,7 @@ public class SubscriberAnnotationPostProcessorTests {
subscriberDef.getConstructorArgumentValues().addGenericArgumentValue(latch);
context.registerBeanDefinition("testBean", subscriberDef);
String busBeanName = MessageBusParser.MESSAGE_BUS_BEAN_NAME;
context.registerBeanDefinition(busBeanName, new RootBeanDefinition(MessageBus.class));
context.registerBeanDefinition(busBeanName, new RootBeanDefinition(DefaultMessageBus.class));
RootBeanDefinition postProcessorDef = new RootBeanDefinition(SubscriberAnnotationPostProcessor.class);
postProcessorDef.getPropertyValues().addPropertyValue("messageBus", new RuntimeBeanReference(busBeanName));
context.registerBeanDefinition("postProcessor", postProcessorDef);
@@ -78,7 +78,7 @@ public class SubscriberAnnotationPostProcessorTests {
subscriberDef.getConstructorArgumentValues().addGenericArgumentValue(latch);
context.registerBeanDefinition("testBean", subscriberDef);
String busBeanName = MessageBusParser.MESSAGE_BUS_BEAN_NAME;
context.registerBeanDefinition(busBeanName, new RootBeanDefinition(MessageBus.class));
context.registerBeanDefinition(busBeanName, new RootBeanDefinition(DefaultMessageBus.class));
RootBeanDefinition postProcessorDef = new RootBeanDefinition(SubscriberAnnotationPostProcessor.class);
postProcessorDef.getPropertyValues().addPropertyValue("messageBus", new RuntimeBeanReference(busBeanName));
postProcessorDef.getPropertyValues().addPropertyValue("subscriberAnnotationType", CustomSubscriberAnnotation.class);

View File

@@ -47,6 +47,7 @@ import org.springframework.integration.annotation.MessageTarget;
import org.springframework.integration.annotation.Polled;
import org.springframework.integration.annotation.Splitter;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.channel.ChannelRegistry;
import org.springframework.integration.channel.ChannelRegistryAware;
@@ -68,7 +69,7 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testHandlerAnnotation() {
MessageBus messageBus = new MessageBus();
MessageBus messageBus = new DefaultMessageBus();
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.afterPropertiesSet();
HandlerAnnotatedBean bean = new HandlerAnnotatedBean();
@@ -78,7 +79,7 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testCustomHandlerAnnotation() {
MessageBus messageBus = new MessageBus();
MessageBus messageBus = new DefaultMessageBus();
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.afterPropertiesSet();
CustomHandlerAnnotatedBean bean = new CustomHandlerAnnotatedBean();
@@ -158,7 +159,7 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testTargetAnnotation() throws InterruptedException {
MessageBus messageBus = new MessageBus();
MessageBus messageBus = new DefaultMessageBus();
QueueChannel testChannel = new QueueChannel();
messageBus.registerChannel("testChannel", testChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
@@ -176,7 +177,7 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testConcurrencyAnnotationWithValues() {
MessageBus messageBus = new MessageBus();
MessageBus messageBus = new DefaultMessageBus();
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.afterPropertiesSet();
ConcurrencyAnnotationTestBean testBean = new ConcurrencyAnnotationTestBean();
@@ -203,7 +204,7 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testChannelRegistryAwareBean() {
MessageBus messageBus = new MessageBus();
MessageBus messageBus = new DefaultMessageBus();
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.afterPropertiesSet();
ChannelRegistryAwareTestBean testBean = new ChannelRegistryAwareTestBean();
@@ -216,7 +217,7 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testProxiedMessageEndpointAnnotation() {
MessageBus messageBus = new MessageBus();
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setAutoCreateChannels(true);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.afterPropertiesSet();
@@ -233,7 +234,7 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testMessageEndpointAnnotationInherited() {
MessageBus messageBus = new MessageBus();
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setAutoCreateChannels(true);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.afterPropertiesSet();
@@ -248,7 +249,7 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testMessageEndpointAnnotationInheritedWithProxy() {
MessageBus messageBus = new MessageBus();
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setAutoCreateChannels(true);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.afterPropertiesSet();
@@ -265,7 +266,7 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testMessageEndpointAnnotationInheritedFromInterface() {
MessageBus messageBus = new MessageBus();
MessageBus messageBus = new DefaultMessageBus();
MessageChannel inputChannel = new QueueChannel();
MessageChannel outputChannel = new QueueChannel();
messageBus.registerChannel("inputChannel", inputChannel);
@@ -281,7 +282,7 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testMessageEndpointAnnotationInheritedFromInterfaceWithAutoCreatedChannels() {
MessageBus messageBus = new MessageBus();
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setAutoCreateChannels(true);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.afterPropertiesSet();
@@ -296,7 +297,7 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testMessageEndpointAnnotationInheritedFromInterfaceWithProxy() {
MessageBus messageBus = new MessageBus();
MessageBus messageBus = new DefaultMessageBus();
MessageChannel inputChannel = new QueueChannel();
MessageChannel outputChannel = new QueueChannel();
messageBus.registerChannel("inputChannel", inputChannel);
@@ -314,7 +315,7 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testSplitterAnnotation() throws InterruptedException {
MessageBus messageBus = new MessageBus();
MessageBus messageBus = new DefaultMessageBus();
QueueChannel input = new QueueChannel();
QueueChannel output = new QueueChannel();
messageBus.registerChannel("input", input);
@@ -342,7 +343,7 @@ public class MessagingAnnotationPostProcessorTests {
@Test(expected=ConfigurationException.class)
public void testEndpointWithNoHandlerMethod() {
MessageBus messageBus = new MessageBus();
MessageBus messageBus = new DefaultMessageBus();
QueueChannel testChannel = new QueueChannel();
messageBus.registerChannel("testChannel", testChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
@@ -353,7 +354,7 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testEndpointWithPolledAnnotation() {
MessageBus messageBus = new MessageBus();
MessageBus messageBus = new DefaultMessageBus();
QueueChannel testChannel = new QueueChannel();
messageBus.registerChannel("testChannel", testChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
@@ -372,7 +373,7 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testMessageSourceAnnotation() {
MessageBus messageBus = new MessageBus();
MessageBus messageBus = new DefaultMessageBus();
QueueChannel testChannel = new QueueChannel();
messageBus.registerChannel("testChannel", testChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
@@ -387,7 +388,7 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testHandlerWithTransformers() {
MessageBus messageBus = new MessageBus();
MessageBus messageBus = new DefaultMessageBus();
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.afterPropertiesSet();
HandlerWithTransformers testBean = new HandlerWithTransformers();

View File

@@ -7,7 +7,7 @@
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<bean id="bus" class="org.springframework.integration.bus.MessageBus"/>
<bean id="bus" class="org.springframework.integration.bus.DefaultMessageBus"/>
<integration:channel id="inputChannel"/>

View File

@@ -4,7 +4,7 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<bean id="bus" class="org.springframework.integration.bus.MessageBus">
<bean id="bus" class="org.springframework.integration.bus.DefaultMessageBus">
<property name="autoCreateChannels" value="true"/>
</bean>

View File

@@ -7,7 +7,7 @@
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<bean id="bus" class="org.springframework.integration.bus.MessageBus"/>
<bean id="bus" class="org.springframework.integration.bus.DefaultMessageBus"/>
<integration:channel id="inputChannel"/>

View File

@@ -7,7 +7,7 @@
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<beans:bean class="org.springframework.integration.bus.MessageBus"/>
<beans:bean class="org.springframework.integration.bus.DefaultMessageBus"/>
<queue-channel id="testChannel" capacity="50"/>

View File

@@ -9,6 +9,6 @@
<integration:message-bus/>
<bean id="bus" class="org.springframework.integration.bus.MessageBus"/>
<bean id="bus" class="org.springframework.integration.bus.DefaultMessageBus"/>
</beans>

View File

@@ -7,7 +7,7 @@
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<beans:bean class="org.springframework.integration.bus.MessageBus"/>
<beans:bean class="org.springframework.integration.bus.DefaultMessageBus"/>
<queue-channel id="testChannel" capacity="50"/>

View File

@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.handler.MessageHandler;
@@ -48,7 +49,7 @@ public class RequestReplyTemplateTests {
return new StringMessage(message.getPayload().toString().toUpperCase());
}
};
MessageBus bus = new MessageBus();
MessageBus bus = new DefaultMessageBus();
bus.registerChannel("requestChannel", requestChannel);
bus.registerHandler("testHandler", testHandler, requestChannel, null);
bus.start();

View File

@@ -27,9 +27,9 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.handler.MethodInvokingTarget;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessagingException;
@@ -90,7 +90,7 @@ public class MethodInvokingTargetTests {
Message<String> message = new GenericMessage<String>("123", "testing");
channel.send(message);
assertNull(queue.poll());
MessageBus bus = new MessageBus();
MessageBus bus = new DefaultMessageBus();
bus.registerChannel("channel", channel);
bus.registerHandler("targetAdapter", target, channel, null);
bus.start();

View File

@@ -4,7 +4,7 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<bean id="bus" class="org.springframework.integration.bus.MessageBus">
<bean id="bus" class="org.springframework.integration.bus.DefaultMessageBus">
<property name="autoStartup" value="false"/>
</bean>