Removed EndpointRegistry. DefaultMessageBus now delegates to the ApplicationContext for all endpoint lookups, and the annotation-based post-processing registers singletons with the context rather than going through the DefaultMessageBus.

This commit is contained in:
Mark Fisher
2008-09-18 22:42:08 +00:00
parent fe14643a8d
commit eb5884a7d6
18 changed files with 226 additions and 187 deletions

View File

@@ -28,6 +28,7 @@ import org.junit.Test;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.channel.ChannelRegistry;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
@@ -49,16 +50,16 @@ public class DefaultMessageBusTests {
@Test
public void testRegistrationWithInputChannelReference() {
DefaultMessageBus bus = new DefaultMessageBus();
GenericApplicationContext context = new GenericApplicationContext();
QueueChannel sourceChannel = new QueueChannel();
QueueChannel targetChannel = new QueueChannel();
sourceChannel.setBeanName("sourceChannel");
targetChannel.setBeanName("targetChannel");
bus.registerChannel(sourceChannel);
context.getBeanFactory().registerSingleton("sourceChannel", sourceChannel);
context.getBeanFactory().registerSingleton("targetChannel", targetChannel);
Message<String> message = MessageBuilder.withPayload("test")
.setReturnAddress("targetChannel").build();
sourceChannel.send(message);
bus.registerChannel(targetChannel);
AbstractInOutEndpoint endpoint = new AbstractInOutEndpoint() {
public Message<?> handle(Message<?> message) {
return message;
@@ -66,7 +67,10 @@ public class DefaultMessageBusTests {
};
endpoint.setBeanName("testEndpoint");
endpoint.setInputChannel(sourceChannel);
bus.registerEndpoint(endpoint);
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
context.refresh();
DefaultMessageBus bus = new DefaultMessageBus();
bus.setApplicationContext(context);
bus.start();
Message<?> result = targetChannel.receive(3000);
assertEquals("test", result.getPayload());
@@ -75,14 +79,16 @@ public class DefaultMessageBusTests {
@Test
public void testChannelsWithoutHandlers() {
MessageBus bus = new DefaultMessageBus();
GenericApplicationContext context = new GenericApplicationContext();
DefaultMessageBus bus = new DefaultMessageBus();
bus.setApplicationContext(context);
QueueChannel sourceChannel = new QueueChannel();
sourceChannel.setBeanName("sourceChannel");
context.getBeanFactory().registerSingleton("sourceChannel", sourceChannel);
sourceChannel.send(new StringMessage("test"));
QueueChannel targetChannel = new QueueChannel();
targetChannel.setBeanName("targetChannel");
bus.registerChannel(sourceChannel);
bus.registerChannel(targetChannel);
context.getBeanFactory().registerSingleton("targetChannel", targetChannel);
bus.start();
Message<?> result = targetChannel.receive(100);
assertNull(result);
@@ -104,6 +110,7 @@ public class DefaultMessageBusTests {
@Test
public void testExactlyOneHandlerReceivesPointToPointMessage() {
GenericApplicationContext context = new GenericApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel1 = new QueueChannel();
QueueChannel outputChannel2 = new QueueChannel();
@@ -117,21 +124,22 @@ public class DefaultMessageBusTests {
return MessageBuilder.fromMessage(message).build();
}
};
MessageBus bus = new DefaultMessageBus();
inputChannel.setBeanName("input");
outputChannel1.setBeanName("output1");
outputChannel2.setBeanName("output2");
bus.registerChannel(inputChannel);
bus.registerChannel(outputChannel1);
bus.registerChannel(outputChannel2);
context.getBeanFactory().registerSingleton("input", inputChannel);
context.getBeanFactory().registerSingleton("output1", outputChannel1);
context.getBeanFactory().registerSingleton("output2", outputChannel2);
endpoint1.setBeanName("testEndpoint1");
endpoint1.setInputChannel(inputChannel);
endpoint1.setOutputChannel(outputChannel1);
endpoint2.setBeanName("testEndpoint2");
endpoint2.setInputChannel(inputChannel);
endpoint2.setOutputChannel(outputChannel2);
bus.registerEndpoint(endpoint1);
bus.registerEndpoint(endpoint2);
context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1);
context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2);
DefaultMessageBus bus = new DefaultMessageBus();
bus.setApplicationContext(context);
bus.start();
inputChannel.send(new StringMessage("testing"));
Message<?> message1 = outputChannel1.receive(500);
@@ -142,6 +150,7 @@ public class DefaultMessageBusTests {
@Test
public void testBothHandlersReceivePublishSubscribeMessage() throws InterruptedException {
GenericApplicationContext context = new GenericApplicationContext();
PublishSubscribeChannel inputChannel = new PublishSubscribeChannel();
QueueChannel outputChannel1 = new QueueChannel();
QueueChannel outputChannel2 = new QueueChannel();
@@ -160,21 +169,22 @@ public class DefaultMessageBusTests {
return reply;
}
};
MessageBus bus = new DefaultMessageBus();
inputChannel.setBeanName("input");
outputChannel1.setBeanName("output1");
outputChannel2.setBeanName("output2");
bus.registerChannel(inputChannel);
bus.registerChannel(outputChannel1);
bus.registerChannel(outputChannel2);
context.getBeanFactory().registerSingleton("input", inputChannel);
context.getBeanFactory().registerSingleton("output1", outputChannel1);
context.getBeanFactory().registerSingleton("output2", outputChannel2);
endpoint1.setBeanName("testEndpoint1");
endpoint1.setInputChannel(inputChannel);
endpoint1.setOutputChannel(outputChannel1);
endpoint2.setBeanName("testEndpoint2");
endpoint2.setInputChannel(inputChannel);
endpoint2.setOutputChannel(outputChannel2);
bus.registerEndpoint(endpoint1);
bus.registerEndpoint(endpoint2);
context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1);
context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2);
DefaultMessageBus bus = new DefaultMessageBus();
bus.setApplicationContext(context);
bus.start();
inputChannel.send(new StringMessage("testing"));
latch.await(500, TimeUnit.MILLISECONDS);
@@ -188,18 +198,20 @@ public class DefaultMessageBusTests {
@Test
public void testErrorChannelWithFailedDispatch() throws InterruptedException {
MessageBus bus = new DefaultMessageBus();
GenericApplicationContext context = new GenericApplicationContext();
QueueChannel errorChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
errorChannel.setBeanName("errorChannel");
bus.registerChannel(errorChannel);
context.getBeanFactory().registerSingleton("errorChannel", errorChannel);
CountDownLatch latch = new CountDownLatch(1);
SourcePollingChannelAdapter channelAdapter = new SourcePollingChannelAdapter();
channelAdapter.setSource(new FailingSource(latch));
channelAdapter.setSchedule(new PollingSchedule(1000));
channelAdapter.setOutputChannel(outputChannel);
channelAdapter.setBeanName("testChannel");
bus.registerEndpoint(channelAdapter);
context.getBeanFactory().registerSingleton("testChannel", channelAdapter);
DefaultMessageBus bus = new DefaultMessageBus();
bus.setApplicationContext(context);
bus.start();
latch.await(2000, TimeUnit.MILLISECONDS);
Message<?> message = errorChannel.receive(5000);
@@ -227,10 +239,10 @@ public class DefaultMessageBusTests {
@Test
public void testHandlerSubscribedToErrorChannel() throws InterruptedException {
DefaultMessageBus bus = new DefaultMessageBus();
GenericApplicationContext context = new GenericApplicationContext();
QueueChannel errorChannel = new QueueChannel();
errorChannel.setBeanName(ChannelRegistry.ERROR_CHANNEL_NAME);
bus.registerChannel(errorChannel);
context.getBeanFactory().registerSingleton(ChannelRegistry.ERROR_CHANNEL_NAME, errorChannel);
final CountDownLatch latch = new CountDownLatch(1);
AbstractInOutEndpoint endpoint = new AbstractInOutEndpoint() {
public Message<?> handle(Message<?> message) {
@@ -240,7 +252,9 @@ public class DefaultMessageBusTests {
};
endpoint.setBeanName("testEndpoint");
endpoint.setInputChannel(errorChannel);
bus.registerEndpoint(endpoint);
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
DefaultMessageBus bus = new DefaultMessageBus();
bus.setApplicationContext(context);
bus.start();
errorChannel.send(new ErrorMessage(new RuntimeException("test-exception")));
latch.await(1000, TimeUnit.MILLISECONDS);

View File

@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.ChannelRegistry;
@@ -59,12 +60,14 @@ public class DirectChannelSubscriptionTests {
@Test
public void testSendAndReceiveForRegisteredEndpoint() {
GenericApplicationContext context = new GenericApplicationContext();
MethodInvoker invoker = new MessageMappingMethodInvoker(new TestBean(), "handle");
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(invoker);
endpoint.setInputChannel(sourceChannel);
endpoint.setOutputChannel(targetChannel);
endpoint.setBeanName("testEndpoint");
bus.registerEndpoint(endpoint);
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
bus.setApplicationContext(context);
bus.start();
this.sourceChannel.send(new StringMessage("foo"));
Message<?> response = this.targetChannel.receive();
@@ -74,7 +77,10 @@ public class DirectChannelSubscriptionTests {
@Test
public void testSendAndReceiveForAnnotatedEndpoint() {
GenericApplicationContext context = new GenericApplicationContext();
bus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(bus);
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
TestEndpoint endpoint = new TestEndpoint();
postProcessor.postProcessAfterInitialization(endpoint, "testEndpoint");
@@ -105,10 +111,13 @@ public class DirectChannelSubscriptionTests {
@Test(expected=MessagingException.class)
public void testExceptionThrownFromAnnotatedEndpoint() {
GenericApplicationContext context = new GenericApplicationContext();
bus.setApplicationContext(context);
QueueChannel errorChannel = new QueueChannel();
errorChannel.setBeanName(ChannelRegistry.ERROR_CHANNEL_NAME);
bus.registerChannel(errorChannel);
context.getBeanFactory().registerSingleton(ChannelRegistry.ERROR_CHANNEL_NAME, errorChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(bus);
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
FailingTestEndpoint endpoint = new FailingTestEndpoint();
postProcessor.postProcessAfterInitialization(endpoint, "testEndpoint");

View File

@@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.bus.MessageBus;
@@ -32,6 +33,7 @@ public class MessageBusInterceptorTests {
@Test
public void testStart() {
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(new GenericApplicationContext());
TestMessageBusStartInterceptor startInterceptor = new TestMessageBusStartInterceptor();
TestMessageBusStopInterceptor stopInterceptor = new TestMessageBusStopInterceptor();
// add all interceptors

View File

@@ -48,7 +48,7 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests
MessageBus bus = (MessageBus) this.applicationContext.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
bus.start();
assertNotNull(bus.lookupChannel(beanName));
Object adapter = bus.lookupEndpoint(beanName + ".adapter");
Object adapter = this.applicationContext.getBean(beanName + ".adapter");
assertNotNull(adapter);
assertTrue(adapter instanceof OutboundChannelAdapter);
TestTarget target = (TestTarget) this.applicationContext.getBean("target");
@@ -68,7 +68,7 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests
MessageBus bus = (MessageBus) this.applicationContext.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
bus.start();
assertNotNull(bus.lookupChannel(beanName));
Object adapter = bus.lookupEndpoint(beanName + ".adapter");
Object adapter = this.applicationContext.getBean(beanName + ".adapter");
assertNotNull(adapter);
assertTrue(adapter instanceof OutboundChannelAdapter);
TestBean testBean = (TestBean) this.applicationContext.getBean("testBean");
@@ -86,7 +86,7 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests
PollableChannel channel = (PollableChannel) this.applicationContext.getBean("queueChannel");
MessageBus bus = (MessageBus) this.applicationContext.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
assertNull(bus.lookupChannel(beanName));
Object adapter = bus.lookupEndpoint(beanName);
Object adapter = this.applicationContext.getBean(beanName);
assertNotNull(adapter);
assertTrue(adapter instanceof SourcePollingChannelAdapter);
TestBean testBean = (TestBean) this.applicationContext.getBean("testBean");

View File

@@ -90,8 +90,7 @@ public class AggregatorAnnotationTests {
@SuppressWarnings("unchecked")
private DirectFieldAccessor getDirectFieldAccessorForAggregatingHandler(ApplicationContext context, final String endpointName) {
MessageBus messageBus = this.getMessageBus(context);
AggregatorEndpoint endpoint = (AggregatorEndpoint) messageBus.lookupEndpoint(endpointName + ".aggregator");
AggregatorEndpoint endpoint = (AggregatorEndpoint) context.getBean(endpointName + ".aggregatingMethod.aggregator");
return new DirectFieldAccessor(endpoint);
}

View File

@@ -31,13 +31,13 @@ import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.annotation.ChannelAdapter;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.channel.ChannelRegistry;
import org.springframework.integration.channel.ChannelRegistryAware;
import org.springframework.integration.channel.DirectChannel;
@@ -60,8 +60,11 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testHandlerAnnotation() {
MessageBus messageBus = new DefaultMessageBus();
GenericApplicationContext context = new GenericApplicationContext();
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
HandlerAnnotatedBean bean = new HandlerAnnotatedBean();
Object result = postProcessor.postProcessAfterInitialization(bean, "testBean");
@@ -128,8 +131,11 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testTargetAnnotation() throws InterruptedException {
MessageBus messageBus = new DefaultMessageBus();
GenericApplicationContext context = new GenericApplicationContext();
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
CountDownLatch latch = new CountDownLatch(1);
TargetAnnotationTestBean testBean = new TargetAnnotationTestBean(latch);
@@ -150,11 +156,14 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testChannelRegistryAwareBean() {
MessageBus messageBus = new DefaultMessageBus();
GenericApplicationContext context = new GenericApplicationContext();
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(context);
QueueChannel inputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
messageBus.registerChannel(inputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
ChannelRegistryAwareTestBean testBean = new ChannelRegistryAwareTestBean();
assertNull(testBean.getChannelRegistry());
@@ -166,14 +175,17 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testProxiedMessageEndpointAnnotation() {
DefaultMessageBus messageBus = new DefaultMessageBus();
GenericApplicationContext context = new GenericApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
outputChannel.setBeanName("outputChannel");
messageBus.registerChannel(inputChannel);
messageBus.registerChannel(outputChannel);
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
ProxyFactory proxyFactory = new ProxyFactory(new SimpleAnnotatedEndpoint());
Object proxy = proxyFactory.getProxy();
@@ -187,14 +199,17 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testMessageEndpointAnnotationInherited() {
DefaultMessageBus messageBus = new DefaultMessageBus();
GenericApplicationContext context = new GenericApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
outputChannel.setBeanName("outputChannel");
messageBus.registerChannel(inputChannel);
messageBus.registerChannel(outputChannel);
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointSubclass(), "subclass");
messageBus.start();
@@ -206,14 +221,17 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testMessageEndpointAnnotationInheritedWithProxy() {
DefaultMessageBus messageBus = new DefaultMessageBus();
GenericApplicationContext context = new GenericApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
outputChannel.setBeanName("outputChannel");
messageBus.registerChannel(inputChannel);
messageBus.registerChannel(outputChannel);
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
ProxyFactory proxyFactory = new ProxyFactory(new SimpleAnnotatedEndpointSubclass());
Object proxy = proxyFactory.getProxy();
@@ -227,14 +245,17 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testMessageEndpointAnnotationInheritedFromInterface() {
MessageBus messageBus = new DefaultMessageBus();
GenericApplicationContext context = new GenericApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
outputChannel.setBeanName("outputChannel");
messageBus.registerChannel(inputChannel);
messageBus.registerChannel(outputChannel);
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl");
messageBus.start();
@@ -246,14 +267,17 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testMessageEndpointAnnotationInheritedFromInterfaceWithAutoCreatedChannels() {
DefaultMessageBus messageBus = new DefaultMessageBus();
GenericApplicationContext context = new GenericApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
outputChannel.setBeanName("outputChannel");
messageBus.registerChannel(inputChannel);
messageBus.registerChannel(outputChannel);
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl");
messageBus.start();
@@ -265,14 +289,17 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testMessageEndpointAnnotationInheritedFromInterfaceWithProxy() {
MessageBus messageBus = new DefaultMessageBus();
GenericApplicationContext context = new GenericApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
outputChannel.setBeanName("outputChannel");
messageBus.registerChannel(inputChannel);
messageBus.registerChannel(outputChannel);
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
ProxyFactory proxyFactory = new ProxyFactory(new SimpleAnnotatedEndpointImplementation());
Object proxy = proxyFactory.getProxy();
@@ -286,15 +313,19 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testEndpointWithPollerAnnotation() {
MessageBus messageBus = new DefaultMessageBus();
GenericApplicationContext context = new GenericApplicationContext();
QueueChannel testChannel = new QueueChannel();
testChannel.setBeanName("testChannel");
messageBus.registerChannel(testChannel);
context.getBeanFactory().registerSingleton("testChannel", testChannel);
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
AnnotatedEndpointWithPolledAnnotation endpoint = new AnnotatedEndpointWithPolledAnnotation();
postProcessor.postProcessAfterInitialization(endpoint, "testBean");
ServiceActivatorEndpoint processedEndpoint = (ServiceActivatorEndpoint) messageBus.lookupEndpoint("testBean.serviceActivator");
ServiceActivatorEndpoint processedEndpoint =
(ServiceActivatorEndpoint) context.getBean("testBean.prependFoo.serviceActivator");
processedEndpoint.afterPropertiesSet();
DirectFieldAccessor accessor = new DirectFieldAccessor(processedEndpoint);
ChannelPoller poller = (ChannelPoller) accessor.getPropertyValue("poller");
@@ -309,8 +340,11 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testChannelAdapterAnnotation() throws InterruptedException {
MessageBus messageBus = new DefaultMessageBus();
GenericApplicationContext context = new GenericApplicationContext();
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
ChannelAdapterAnnotationTestBean testBean = new ChannelAdapterAnnotationTestBean();
postProcessor.postProcessAfterInitialization(testBean, "testBean");
@@ -333,8 +367,11 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testTransformer() {
MessageBus messageBus = new DefaultMessageBus();
GenericApplicationContext context = new GenericApplicationContext();
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
TransformerAnnotationTestBean testBean = new TransformerAnnotationTestBean();
org.springframework.integration.transformer.Transformer transformer =

View File

@@ -21,10 +21,10 @@ import static org.junit.Assert.assertEquals;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.Message;
@@ -35,32 +35,33 @@ import org.springframework.integration.message.StringMessage;
*/
public class RouterAnnotationPostProcessorTests {
private MessageBus messageBus;
private GenericApplicationContext context = new GenericApplicationContext();
private DirectChannel inputChannel;
private DefaultMessageBus messageBus = new DefaultMessageBus();
private QueueChannel outputChannel;
private DirectChannel inputChannel = new DirectChannel();
private QueueChannel outputChannel = new QueueChannel();
@Before
public void init() {
inputChannel = new DirectChannel();
outputChannel = new QueueChannel();
messageBus.setApplicationContext(context);
inputChannel.setBeanName("input");
outputChannel.setBeanName("output");
messageBus = new DefaultMessageBus();
messageBus.registerChannel(inputChannel);
messageBus.registerChannel(outputChannel);
context.getBeanFactory().registerSingleton("input", inputChannel);
context.getBeanFactory().registerSingleton("output", outputChannel);
}
@Test
public void testRouter() {
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
messageBus.start();
TestRouter testRouter = new TestRouter();
postProcessor.postProcessAfterInitialization(testRouter, "test");
messageBus.start();
inputChannel.send(new StringMessage("foo"));
Message<?> replyMessage = outputChannel.receive(0);
assertEquals("foo", replyMessage.getPayload());

View File

@@ -23,10 +23,10 @@ import static org.junit.Assert.assertNull;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Splitter;
import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.Message;
@@ -37,28 +37,29 @@ import org.springframework.integration.message.StringMessage;
*/
public class SplitterAnnotationPostProcessorTests {
private MessageBus messageBus;
private GenericApplicationContext context = new GenericApplicationContext();
private DirectChannel inputChannel;
private DefaultMessageBus messageBus = new DefaultMessageBus();
private QueueChannel outputChannel;
private DirectChannel inputChannel = new DirectChannel();
private QueueChannel outputChannel = new QueueChannel();
@Before
public void init() {
inputChannel = new DirectChannel();
outputChannel = new QueueChannel();
inputChannel.setBeanName("input");
outputChannel.setBeanName("output");
messageBus = new DefaultMessageBus();
messageBus.registerChannel(inputChannel);
messageBus.registerChannel(outputChannel);
context.getBeanFactory().registerSingleton("input", inputChannel);
context.getBeanFactory().registerSingleton("output", outputChannel);
messageBus.setApplicationContext(context);
}
@Test
public void testSplitterAnnotation() throws InterruptedException {
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
TestSplitter splitter = new TestSplitter();
postProcessor.postProcessAfterInitialization(splitter, "testSplitter");

View File

@@ -27,9 +27,9 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.ConfigurationException;
import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.endpoint.ServiceActivatorEndpoint;
import org.springframework.integration.message.GenericMessage;
@@ -78,21 +78,23 @@ public class MethodInvokingTargetTests {
@Test
public void testSubscription() throws Exception {
GenericApplicationContext context = new GenericApplicationContext();
SynchronousQueue<String> queue = new SynchronousQueue<String>();
TestBean testBean = new TestBean(queue);
MethodInvokingTarget target = new MethodInvokingTarget(testBean, "foo");
target.afterPropertiesSet();
QueueChannel channel = new QueueChannel();
channel.setBeanName("channel");
context.getBeanFactory().registerSingleton("channel", channel);
Message<String> message = new GenericMessage<String>("testing");
channel.send(message);
assertNull(queue.poll());
MessageBus bus = new DefaultMessageBus();
bus.registerChannel(channel);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(target);
endpoint.setBeanName("testEndpoint");
endpoint.setInputChannel(channel);
bus.registerEndpoint(endpoint);
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
DefaultMessageBus bus = new DefaultMessageBus();
bus.setApplicationContext(context);
bus.start();
String result = queue.poll(1000, TimeUnit.MILLISECONDS);
assertNotNull(result);

View File

@@ -27,8 +27,8 @@ import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.endpoint.AbstractInOutEndpoint;
@@ -50,11 +50,13 @@ public class MessageChannelTemplateTests {
return new StringMessage(message.getPayload().toString().toUpperCase());
}
};
MessageBus bus = new DefaultMessageBus();
bus.registerChannel(requestChannel);
endpoint.setBeanName("testEndpoint");
endpoint.setInputChannel(requestChannel);
bus.registerEndpoint(endpoint);
endpoint.setInputChannel(requestChannel);
GenericApplicationContext context = new GenericApplicationContext();
context.getBeanFactory().registerSingleton("requestChannel", requestChannel);
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
DefaultMessageBus bus = new DefaultMessageBus();
bus.setApplicationContext(context);
bus.start();
}