Message Endpoints and the SimpleTaskScheduler now manage their own lifecycles. The ApplicationContextMessageBus is no longer necessary (part of INT-462). The MessagePublishingErrorHandler now detects the default error channel within the beanFactory if necessary (INT-464).

This commit is contained in:
Mark Fisher
2008-11-11 20:11:21 +00:00
parent 7c28053b0f
commit f4ccde6257
52 changed files with 759 additions and 1076 deletions

View File

@@ -26,21 +26,18 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.context.Lifecycle;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.channel.BeanFactoryChannelResolver;
import org.springframework.integration.channel.ChannelResolver;
import org.springframework.integration.channel.MessagePublishingErrorHandler;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.xml.MessageBusParser;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.Message;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.endpoint.PollingConsumer;
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.handler.ReplyMessageHolder;
import org.springframework.integration.message.ErrorMessage;
@@ -49,8 +46,8 @@ import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageSource;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.scheduling.IntervalTrigger;
import org.springframework.integration.scheduling.SimpleTaskScheduler;
import org.springframework.integration.util.TestUtils;
import org.springframework.integration.util.TestUtils.TestApplicationContext;
/**
* @author Mark Fisher
@@ -59,13 +56,11 @@ public class ApplicationContextMessageBusTests {
@Test
public void endpointRegistrationWithInputChannelReference() {
GenericApplicationContext context = new GenericApplicationContext();
TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel sourceChannel = new QueueChannel();
QueueChannel targetChannel = new QueueChannel();
sourceChannel.setBeanName("sourceChannel");
targetChannel.setBeanName("targetChannel");
context.getBeanFactory().registerSingleton("sourceChannel", sourceChannel);
context.getBeanFactory().registerSingleton("targetChannel", targetChannel);
context.registerChannel("sourceChannel", sourceChannel);
context.registerChannel("targetChannel", targetChannel);
Message<String> message = MessageBuilder.withPayload("test")
.setReplyChannelName("targetChannel").build();
sourceChannel.send(message);
@@ -76,37 +71,25 @@ public class ApplicationContextMessageBusTests {
};
handler.setBeanFactory(context);
PollingConsumer endpoint = new PollingConsumer(sourceChannel, handler);
endpoint.afterPropertiesSet();
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
context.registerEndpoint("testEndpoint", endpoint);
context.refresh();
ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, bus);
bus.setApplicationContext(context);
bus.start();
Message<?> result = targetChannel.receive(3000);
assertEquals("test", result.getPayload());
bus.stop();
context.stop();
}
@Test
public void channelsWithoutHandlers() {
GenericApplicationContext context = new GenericApplicationContext();
ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel sourceChannel = new QueueChannel();
sourceChannel.setBeanName("sourceChannel");
context.getBeanFactory().registerSingleton("sourceChannel", sourceChannel);
context.registerChannel("sourceChannel", sourceChannel);
sourceChannel.send(new StringMessage("test"));
QueueChannel targetChannel = new QueueChannel();
targetChannel.setBeanName("targetChannel");
context.getBeanFactory().registerSingleton("targetChannel", targetChannel);
context.registerChannel("targetChannel", targetChannel);
context.refresh();
bus.start();
Message<?> result = targetChannel.receive(100);
assertNull(result);
bus.stop();
context.stop();
}
@Test
@@ -116,15 +99,13 @@ public class ApplicationContextMessageBusTests {
PollableChannel sourceChannel = (PollableChannel) context.getBean("sourceChannel");
sourceChannel.send(new GenericMessage<String>("test"));
PollableChannel targetChannel = (PollableChannel) context.getBean("targetChannel");
Lifecycle bus = (Lifecycle) context.getBean("bus");
bus.start();
Message<?> result = targetChannel.receive(1000);
assertEquals("test", result.getPayload());
}
@Test
public void exactlyOneConsumerReceivesPointToPointMessage() {
GenericApplicationContext context = new GenericApplicationContext();
TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel1 = new QueueChannel();
QueueChannel outputChannel2 = new QueueChannel();
@@ -140,35 +121,26 @@ public class ApplicationContextMessageBusTests {
replyHolder.set(message);
}
};
inputChannel.setBeanName("input");
outputChannel1.setBeanName("output1");
outputChannel2.setBeanName("output2");
context.getBeanFactory().registerSingleton("input", inputChannel);
context.getBeanFactory().registerSingleton("output1", outputChannel1);
context.getBeanFactory().registerSingleton("output2", outputChannel2);
context.registerChannel("input", inputChannel);
context.registerChannel("output1", outputChannel1);
context.registerChannel("output2", outputChannel2);
handler1.setOutputChannel(outputChannel1);
handler2.setOutputChannel(outputChannel2);
PollingConsumer endpoint1 = new PollingConsumer(inputChannel, handler1);
endpoint1.afterPropertiesSet();
PollingConsumer endpoint2 = new PollingConsumer(inputChannel, handler2);
endpoint2.afterPropertiesSet();
context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1);
context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2);
ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
context.registerEndpoint("testEndpoint1", endpoint1);
context.registerEndpoint("testEndpoint2", endpoint2);
context.refresh();
bus.start();
inputChannel.send(new StringMessage("testing"));
Message<?> message1 = outputChannel1.receive(500);
Message<?> message2 = outputChannel2.receive(0);
bus.stop();
context.stop();
assertTrue("exactly one message should be null", message1 == null ^ message2 == null);
}
@Test
public void bothConsumersReceivePublishSubscribeMessage() throws InterruptedException {
GenericApplicationContext context = new GenericApplicationContext();
TestApplicationContext context = TestUtils.createTestApplicationContext();
PublishSubscribeChannel inputChannel = new PublishSubscribeChannel();
QueueChannel outputChannel1 = new QueueChannel();
QueueChannel outputChannel2 = new QueueChannel();
@@ -187,60 +159,45 @@ public class ApplicationContextMessageBusTests {
latch.countDown();
}
};
inputChannel.setBeanName("input");
outputChannel1.setBeanName("output1");
outputChannel2.setBeanName("output2");
context.getBeanFactory().registerSingleton("input", inputChannel);
context.getBeanFactory().registerSingleton("output1", outputChannel1);
context.getBeanFactory().registerSingleton("output2", outputChannel2);
context.registerChannel("input", inputChannel);
context.registerChannel("output1", outputChannel1);
context.registerChannel("output2", outputChannel2);
handler1.setOutputChannel(outputChannel1);
handler2.setOutputChannel(outputChannel2);
EventDrivenConsumer endpoint1 = new EventDrivenConsumer(inputChannel, handler1);
EventDrivenConsumer endpoint2 = new EventDrivenConsumer(inputChannel, handler2);
context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1);
context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2);
ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
context.registerEndpoint("testEndpoint1", endpoint1);
context.registerEndpoint("testEndpoint2", endpoint2);
context.refresh();
bus.start();
inputChannel.send(new StringMessage("testing"));
latch.await(500, TimeUnit.MILLISECONDS);
assertEquals("both handlers should have been invoked", 0, latch.getCount());
Message<?> message1 = outputChannel1.receive(500);
Message<?> message2 = outputChannel2.receive(500);
bus.stop();
context.stop();
assertNotNull("both handlers should have replied to the message", message1);
assertNotNull("both handlers should have replied to the message", message2);
}
@Test
public void errorChannelWithFailedDispatch() throws InterruptedException {
GenericApplicationContext context = new GenericApplicationContext();
TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel errorChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
errorChannel.setBeanName("errorChannel");
context.getBeanFactory().registerSingleton("errorChannel", errorChannel);
context.registerChannel("errorChannel", errorChannel);
CountDownLatch latch = new CountDownLatch(1);
SourcePollingChannelAdapter channelAdapter = new SourcePollingChannelAdapter();
channelAdapter.setSource(new FailingSource(latch));
channelAdapter.setTrigger(new IntervalTrigger(1000));
channelAdapter.setOutputChannel(outputChannel);
channelAdapter.setBeanName("testChannel");
context.getBeanFactory().registerSingleton("testChannel", channelAdapter);
ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
SimpleTaskScheduler taskScheduler = (SimpleTaskScheduler) TestUtils.createTaskScheduler(10);
context.registerEndpoint("testChannel", channelAdapter);
ChannelResolver channelResolver = new BeanFactoryChannelResolver(context);
MessagePublishingErrorHandler errorHandler = new MessagePublishingErrorHandler(channelResolver);
errorHandler.setDefaultErrorChannel(errorChannel);
taskScheduler.setErrorHandler(errorHandler);
bus.setTaskScheduler(taskScheduler);
bus.setApplicationContext(context);
context.refresh();
bus.start();
latch.await(2000, TimeUnit.MILLISECONDS);
Message<?> message = errorChannel.receive(5000);
bus.stop();
context.stop();
assertNull(outputChannel.receive(0));
assertNotNull("message should not be null", message);
assertTrue(message instanceof ErrorMessage);
@@ -248,17 +205,11 @@ public class ApplicationContextMessageBusTests {
assertEquals("intentional test failure", exception.getMessage());
}
@Test(expected = BeanCreationException.class)
public void multipleMessageBusBeans() {
new ClassPathXmlApplicationContext("multipleMessageBusBeans.xml", this.getClass());
}
@Test
public void consumerSubscribedToErrorChannel() throws InterruptedException {
GenericApplicationContext context = new GenericApplicationContext();
TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel errorChannel = new QueueChannel();
errorChannel.setBeanName(ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME);
context.getBeanFactory().registerSingleton(ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME, errorChannel);
context.registerChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, errorChannel);
final CountDownLatch latch = new CountDownLatch(1);
AbstractReplyProducingMessageHandler handler = new AbstractReplyProducingMessageHandler() {
@Override
@@ -267,16 +218,12 @@ public class ApplicationContextMessageBusTests {
}
};
PollingConsumer endpoint = new PollingConsumer(errorChannel, handler);
endpoint.afterPropertiesSet();
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
context.registerEndpoint("testEndpoint", endpoint);
context.refresh();
bus.start();
errorChannel.send(new ErrorMessage(new RuntimeException("test-exception")));
latch.await(1000, TimeUnit.MILLISECONDS);
assertEquals("handler should have received error message", 0, latch.getCount());
context.stop();
}

View File

@@ -21,14 +21,13 @@ import static org.junit.Assert.assertEquals;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.channel.ThreadLocalChannel;
import org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor;
import org.springframework.integration.config.xml.MessageBusParser;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessagingException;
import org.springframework.integration.endpoint.EventDrivenConsumer;
@@ -37,15 +36,14 @@ import org.springframework.integration.handler.ReplyMessageHolder;
import org.springframework.integration.handler.ServiceActivatingHandler;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.util.TestUtils;
import org.springframework.integration.util.TestUtils.TestApplicationContext;
/**
* @author Mark Fisher
*/
public class DirectChannelSubscriptionTests {
private GenericApplicationContext context = new GenericApplicationContext();
private ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
private TestApplicationContext context = TestUtils.createTestApplicationContext();
private DirectChannel sourceChannel = new DirectChannel();
@@ -54,30 +52,23 @@ public class DirectChannelSubscriptionTests {
@Before
public void setupChannels() {
sourceChannel.setBeanName("sourceChannel");
targetChannel.setBeanName("targetChannel");
context.getBeanFactory().registerSingleton("sourceChannel", sourceChannel);
context.getBeanFactory().registerSingleton("targetChannel", targetChannel);
context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, bus);
bus.setApplicationContext(context);
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
context.registerChannel("sourceChannel", sourceChannel);
context.registerChannel("targetChannel", targetChannel);
}
@Test
public void sendAndReceiveForRegisteredEndpoint() {
GenericApplicationContext context = new GenericApplicationContext();
TestApplicationContext context = TestUtils.createTestApplicationContext();
ServiceActivatingHandler serviceActivator = new ServiceActivatingHandler(new TestBean(), "handle");
serviceActivator.setOutputChannel(targetChannel);
EventDrivenConsumer endpoint = new EventDrivenConsumer(sourceChannel, serviceActivator);
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
bus.setApplicationContext(context);
context.registerEndpoint("testEndpoint", endpoint);
context.refresh();
bus.start();
this.sourceChannel.send(new StringMessage("foo"));
Message<?> response = this.targetChannel.receive();
assertEquals("foo!", response.getPayload());
bus.stop();
context.stop();
}
@Test
@@ -88,11 +79,10 @@ public class DirectChannelSubscriptionTests {
TestEndpoint endpoint = new TestEndpoint();
postProcessor.postProcessAfterInitialization(endpoint, "testEndpoint");
context.refresh();
bus.start();
this.sourceChannel.send(new StringMessage("foo"));
Message<?> response = this.targetChannel.receive();
assertEquals("foo-from-annotated-endpoint", response.getPayload());
bus.stop();
context.stop();
}
@Test(expected = MessagingException.class)
@@ -105,27 +95,32 @@ public class DirectChannelSubscriptionTests {
};
handler.setOutputChannel(targetChannel);
EventDrivenConsumer endpoint = new EventDrivenConsumer(sourceChannel, handler);
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
bus.setApplicationContext(context);
context.registerEndpoint("testEndpoint", endpoint);
context.refresh();
bus.start();
this.sourceChannel.send(new StringMessage("foo"));
try {
this.sourceChannel.send(new StringMessage("foo"));
}
finally {
context.stop();
}
}
@Test(expected = MessagingException.class)
public void exceptionThrownFromAnnotatedEndpoint() {
QueueChannel errorChannel = new QueueChannel();
errorChannel.setBeanName(ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME);
context.getBeanFactory().registerSingleton(
ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME, errorChannel);
context.registerChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, errorChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
FailingTestEndpoint endpoint = new FailingTestEndpoint();
postProcessor.postProcessAfterInitialization(endpoint, "testEndpoint");
context.refresh();
bus.start();
this.sourceChannel.send(new StringMessage("foo"));
try {
this.sourceChannel.send(new StringMessage("foo"));
}
finally {
context.stop();
}
}

View File

@@ -1,100 +0,0 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.bus;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import org.junit.Test;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.config.xml.MessageBusParser;
import org.springframework.integration.util.TestUtils;
/**
* @author Marius Bogoevici
* @author Mark Fisher
*/
public class MessageBusEventTests {
@Test
public void messageBusStartedEvent() {
GenericApplicationContext context = new GenericApplicationContext();
context.registerBeanDefinition("listener", new RootBeanDefinition(TestMessageBusEventListener.class));
ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
TestMessageBusEventListener listener = (TestMessageBusEventListener) context.getBean("listener");
assertNull(listener.startedBus);
assertNull(listener.stoppedBus);
context.refresh(); // bus will start
assertNotNull(listener.startedBus);
assertNull(listener.stoppedBus);
assertEquals(messageBus, listener.startedBus);
}
@Test
public void messageBusStoppedEvent() {
GenericApplicationContext context = new GenericApplicationContext();
context.registerBeanDefinition("listener", new RootBeanDefinition(TestMessageBusEventListener.class));
ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
TestMessageBusEventListener listener = (TestMessageBusEventListener) context.getBean("listener");
assertNull(listener.startedBus);
assertNull(listener.stoppedBus);
context.refresh();
assertNotNull(listener.startedBus);
assertNull(listener.stoppedBus);
messageBus.stop();
assertNotNull(listener.stoppedBus);
assertEquals(messageBus, listener.stoppedBus);
}
public static class TestMessageBusEventListener implements ApplicationListener {
private volatile ApplicationContextMessageBus startedBus;
private volatile ApplicationContextMessageBus stoppedBus;
public ApplicationContextMessageBus getStartedBus() {
return this.startedBus;
}
public ApplicationContextMessageBus getStoppedBus() {
return this.stoppedBus;
}
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof MessageBusStartedEvent) {
this.startedBus = (ApplicationContextMessageBus) event.getSource();
}
if (event instanceof MessageBusStoppedEvent) {
this.stoppedBus = (ApplicationContextMessageBus) event.getSource();
}
}
}
}

View File

@@ -4,13 +4,9 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="bus" class="org.springframework.integration.bus.ApplicationContextMessageBus">
<property name="taskScheduler">
<bean class="org.springframework.integration.util.TestUtils"
factory-method="createTaskScheduler">
<constructor-arg value="10"/>
</bean>
</property>
<bean id="taskScheduler" class="org.springframework.integration.util.TestUtils"
factory-method="createTaskScheduler">
<constructor-arg value="10"/>
</bean>
<bean id="sourceChannel" class="org.springframework.integration.channel.QueueChannel"/>

View File

@@ -1,11 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<bean id="bus1" class="org.springframework.integration.bus.ApplicationContextMessageBus"/>
<bean id="bus2" class="org.springframework.integration.bus.ApplicationContextMessageBus"/>
</beans>

View File

@@ -26,11 +26,10 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.bus.ApplicationContextMessageBus;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.endpoint.PollingConsumer;
@@ -39,19 +38,22 @@ import org.springframework.integration.handler.ReplyMessageHolder;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.util.TestUtils;
import org.springframework.integration.util.TestUtils.TestApplicationContext;
/**
* @author Mark Fisher
*/
public class MessageChannelTemplateTests {
private TestApplicationContext context = TestUtils.createTestApplicationContext();
private QueueChannel requestChannel;
@Before
public void setUp() {
this.requestChannel = new QueueChannel();
this.requestChannel.setBeanName("requestChannel");
context.registerChannel("requestChannel", requestChannel);
AbstractReplyProducingMessageHandler handler = new AbstractReplyProducingMessageHandler() {
@Override
public void handleRequestMessage(Message<?> message, ReplyMessageHolder replyHolder) {
@@ -59,17 +61,14 @@ public class MessageChannelTemplateTests {
}
};
PollingConsumer endpoint = new PollingConsumer(requestChannel, handler);
endpoint.afterPropertiesSet();
GenericApplicationContext context = new GenericApplicationContext();
context.getBeanFactory().registerSingleton("requestChannel", requestChannel);
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
context.registerEndpoint("testEndpoint", endpoint);
context.refresh();
bus.start();
}
@After
public void tearDown() {
context.stop();
}
@Test
public void send() {

View File

@@ -7,7 +7,7 @@
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">
<message-bus auto-startup="false"/>
<message-bus/>
<channel id="queueChannel">
<queue capacity="10"/>
@@ -17,7 +17,7 @@
<outbound-channel-adapter id="methodInvokingConsumer" ref="testBean" method="store"/>
<inbound-channel-adapter id="methodInvokingSource" ref="testBean" method="getMessage" channel="queueChannel">
<inbound-channel-adapter id="methodInvokingSource" ref="testBean" method="getMessage" channel="queueChannel" auto-startup="false">
<poller max-messages-per-poll="1">
<interval-trigger interval="10000"/>
</poller>

View File

@@ -21,18 +21,19 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.Lifecycle;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.integration.channel.BeanFactoryChannelResolver;
import org.springframework.integration.channel.ChannelResolutionException;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.config.xml.MessageBusParser;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.integration.message.StringMessage;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
@@ -43,13 +44,22 @@ import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
@ContextConfiguration
public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests {
@Before
public void startContext() {
((AbstractApplicationContext) this.applicationContext).start();
}
@After
public void stopContext() {
((AbstractApplicationContext) this.applicationContext).stop();
}
@Test
public void targetOnly() {
String beanName = "outboundWithImplicitChannel";
Object channel = this.applicationContext.getBean(beanName);
assertTrue(channel instanceof DirectChannel);
Lifecycle bus = (Lifecycle) this.applicationContext.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
bus.start();
BeanFactoryChannelResolver channelResolver = new BeanFactoryChannelResolver(this.applicationContext);
assertNotNull(channelResolver.resolveChannelName(beanName));
Object adapter = this.applicationContext.getBean(beanName + ".adapter");
@@ -61,7 +71,6 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests
assertTrue(((MessageChannel) channel).send(message));
assertNotNull(consumer.getLastMessage());
assertEquals(message, consumer.getLastMessage());
bus.stop();
}
@Test
@@ -69,8 +78,6 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests
String beanName = "methodInvokingConsumer";
Object channel = this.applicationContext.getBean(beanName);
assertTrue(channel instanceof DirectChannel);
Lifecycle bus = (Lifecycle) this.applicationContext.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
bus.start();
BeanFactoryChannelResolver channelResolver = new BeanFactoryChannelResolver(this.applicationContext);
assertNotNull(channelResolver.resolveChannelName(beanName));
Object adapter = this.applicationContext.getBean(beanName + ".adapter");
@@ -82,24 +89,20 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests
assertTrue(((MessageChannel) channel).send(message));
assertNotNull(testBean.getMessage());
assertEquals("consumer test", testBean.getMessage());
bus.stop();
}
@Test
public void methodInvokingSource() {
String beanName = "methodInvokingSource";
PollableChannel channel = (PollableChannel) this.applicationContext.getBean("queueChannel");
Lifecycle bus = (Lifecycle) this.applicationContext.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
bus.start();
TestBean testBean = (TestBean) this.applicationContext.getBean("testBean");
testBean.store("source test");
Object adapter = this.applicationContext.getBean(beanName);
assertNotNull(adapter);
assertTrue(adapter instanceof SourcePollingChannelAdapter);
TestBean testBean = (TestBean) this.applicationContext.getBean("testBean");
testBean.store("source test");
Message<?> message = channel.receive(1000);
assertNotNull(message);
assertEquals("source test", testBean.getMessage());
bus.stop();
}
@Test(expected = ChannelResolutionException.class)

View File

@@ -17,25 +17,16 @@
package org.springframework.integration.config;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.BeanDefinitionStoreException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.Lifecycle;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.integration.bus.MessageBusEventTests.TestMessageBusEventListener;
import org.springframework.integration.channel.BeanFactoryChannelResolver;
import org.springframework.integration.config.xml.MessageBusParser;
import org.springframework.integration.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
@@ -60,42 +51,6 @@ public class MessageBusParserTests {
assertEquals(context.getBean("errorChannel"), resolver.resolveChannelName("errorChannel"));
}
@Test
public void testMultipleMessageBusElements() {
boolean exceptionThrown = false;
try {
new ClassPathXmlApplicationContext("multipleMessageBusElements.xml", this.getClass());
}
catch (BeanDefinitionStoreException e) {
exceptionThrown = true;
assertEquals(IllegalStateException.class, e.getCause().getClass());
}
assertTrue(exceptionThrown);
}
@Test
public void testMessageBusElementAndBean() {
boolean exceptionThrown = false;
try {
new ClassPathXmlApplicationContext("messageBusElementAndBean.xml", this.getClass());
}
catch (BeanCreationException e) {
exceptionThrown = true;
assertEquals(IllegalStateException.class, e.getCause().getClass());
assertEquals(e.getBeanName(), MessageBusParser.MESSAGE_BUS_BEAN_NAME);
}
assertTrue(exceptionThrown);
}
@Test
public void testAutoStartup() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"messageBusWithAutoStartup.xml", this.getClass());
Lifecycle bus = (Lifecycle) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
assertTrue(bus.isRunning());
bus.stop();
}
@Test
public void testMulticasterIsSyncByDefault() {
ApplicationContext context = new ClassPathXmlApplicationContext(
@@ -131,42 +86,4 @@ public class MessageBusParserTests {
assertEquals(ThreadPoolTaskExecutor.class, taskExecutor.getClass());
}
@Test
public void testMessageBusEventListenerReceivesStartedEvent() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"messageBusWithListener.xml", this.getClass());
Lifecycle messageBus = (Lifecycle) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
TestMessageBusEventListener listener = (TestMessageBusEventListener) context.getBean("listener");
assertNull(listener.getStartedBus());
assertNull(listener.getStoppedBus());
messageBus.start();
assertNotNull(listener.getStartedBus());
assertEquals(messageBus, listener.getStartedBus());
assertNull(listener.getStoppedBus());
}
@Test
public void testMessageBusEventListenerReceivesStoppedEvent() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"messageBusWithListener.xml", this.getClass());
Lifecycle messageBus = (Lifecycle) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
TestMessageBusEventListener listener = (TestMessageBusEventListener) context.getBean("listener");
assertNull(listener.getStoppedBus());
messageBus.start();
messageBus.stop();
assertNotNull(listener.getStoppedBus());
assertEquals(messageBus, listener.getStoppedBus());
}
@Test
public void testMessageBusWithTaskScheduler() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"messageBusWithTaskScheduler.xml", this.getClass());
Object messageBus = context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
StubTaskScheduler schedulerBean = (StubTaskScheduler) context.getBean("testScheduler");
TaskScheduler busScheduler = (TaskScheduler) new DirectFieldAccessor(messageBus).getPropertyValue("taskScheduler");
assertNotNull(busScheduler);
assertEquals(schedulerBean, busScheduler);
}
}

View File

@@ -25,16 +25,14 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.bus.ApplicationContextMessageBus;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor;
import org.springframework.integration.config.xml.MessageBusParser;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.util.TestUtils;
import org.springframework.integration.util.TestUtils.TestApplicationContext;
/**
* @author Mark Fisher
@@ -44,19 +42,14 @@ public class ServiceActivatorAnnotationPostProcessorTests {
@Test
public void testAnnotatedMethod() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
GenericApplicationContext context = new GenericApplicationContext();
TestApplicationContext context = TestUtils.createTestApplicationContext();
RootBeanDefinition postProcessorDef = new RootBeanDefinition(MessagingAnnotationPostProcessor.class);
context.registerBeanDefinition("postProcessor", postProcessorDef);
context.registerBeanDefinition("testChannel", new RootBeanDefinition(QueueChannel.class));
RootBeanDefinition beanDefinition = new RootBeanDefinition(SimpleServiceActivatorAnnotationTestBean.class);
beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(latch);
context.registerBeanDefinition("testBean", beanDefinition);
String busBeanName = MessageBusParser.MESSAGE_BUS_BEAN_NAME;
RootBeanDefinition busBeanDefinition = new RootBeanDefinition(ApplicationContextMessageBus.class);
busBeanDefinition.getPropertyValues().addPropertyValue("taskScheduler", TestUtils.createTaskScheduler(10));
context.registerBeanDefinition(busBeanName, busBeanDefinition);
RootBeanDefinition postProcessorDef = new RootBeanDefinition(MessagingAnnotationPostProcessor.class);
context.registerBeanDefinition("postProcessor", postProcessorDef);
context.refresh();
context.start();
SimpleServiceActivatorAnnotationTestBean testBean = (SimpleServiceActivatorAnnotationTestBean) context.getBean("testBean");
assertEquals(1, latch.getCount());
assertNull(testBean.getMessageText());

View File

@@ -26,9 +26,9 @@ import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.bus.ApplicationContextMessageBus;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
@@ -55,7 +55,7 @@ public class AnnotatedEndpointActivationTests {
private PollableChannel output;
@Autowired
private ApplicationContextMessageBus messageBus;
private AbstractApplicationContext applicationContext;
// This has to be static because the MessageBus registers the handler
// more than once (every time a test instance is created), but only one of
@@ -90,15 +90,15 @@ public class AnnotatedEndpointActivationTests {
}
@Test(expected = MessageDeliveryException.class)
public void stopMessageBus() {
messageBus.stop();
public void stopContext() {
applicationContext.stop();
this.input.send(new GenericMessage<String>("foo"));
}
@Test
public void stopAndRestartMessageBus() {
messageBus.stop();
messageBus.start();
public void stopAndRestartContext() {
applicationContext.stop();
applicationContext.start();
this.input.send(new GenericMessage<String>("foo"));
Message<?> message = this.output.receive(100);
assertNotNull(message);

View File

@@ -28,22 +28,18 @@ import org.junit.Test;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.annotation.ChannelAdapter;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.bus.ApplicationContextMessageBus;
import org.springframework.integration.channel.BeanFactoryChannelResolver;
import org.springframework.integration.channel.ChannelResolver;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.xml.MessageBusParser;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.endpoint.PollingConsumer;
@@ -53,6 +49,7 @@ import org.springframework.integration.message.StringMessage;
import org.springframework.integration.scheduling.IntervalTrigger;
import org.springframework.integration.scheduling.Trigger;
import org.springframework.integration.util.TestUtils;
import org.springframework.integration.util.TestUtils.TestApplicationContext;
/**
* @author Mark Fisher
@@ -61,14 +58,9 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void serviceActivatorAnnotation() {
GenericApplicationContext context = new GenericApplicationContext();
TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel inputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
messageBus.setApplicationContext(context);
context.registerChannel("inputChannel", inputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
@@ -131,12 +123,7 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void targetAnnotation() throws InterruptedException {
GenericApplicationContext context = new GenericApplicationContext();
ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
TestApplicationContext context = TestUtils.createTestApplicationContext();
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
@@ -144,14 +131,13 @@ public class MessagingAnnotationPostProcessorTests {
OutboundChannelAdapterTestBean testBean = new OutboundChannelAdapterTestBean(latch);
postProcessor.postProcessAfterInitialization(testBean, "testBean");
context.refresh();
messageBus.start();
ChannelResolver channelResolver = new BeanFactoryChannelResolver(context);
MessageChannel testChannel = channelResolver.resolveChannelName("testChannel");
testChannel.send(new StringMessage("foo"));
latch.await(1000, TimeUnit.MILLISECONDS);
assertEquals(0, latch.getCount());
assertEquals("foo", testBean.getMessageText());
messageBus.stop();
context.stop();
}
@Test(expected = IllegalArgumentException.class)
@@ -161,29 +147,13 @@ public class MessagingAnnotationPostProcessorTests {
postProcessor.afterPropertiesSet();
}
@Test(expected = NoSuchBeanDefinitionException.class)
public void testPostProcessorWithoutMessageBus() {
GenericApplicationContext context = new GenericApplicationContext();
MessagingAnnotationPostProcessor postProcessor =
new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
}
@Test
public void testChannelResolution() {
GenericApplicationContext context = new GenericApplicationContext();
TestApplicationContext context = TestUtils.createTestApplicationContext();
DirectChannel inputChannel = new DirectChannel();
QueueChannel outputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
messageBus.setApplicationContext(context);
context.registerChannel("inputChannel", inputChannel);
context.registerChannel("outputChannel", outputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
@@ -195,22 +165,16 @@ public class MessagingAnnotationPostProcessorTests {
inputChannel.send(message);
Message<?> reply = outputChannel.receive(0);
assertNotNull(reply);
context.stop();
}
@Test
public void testProxiedMessageEndpointAnnotation() {
GenericApplicationContext context = new GenericApplicationContext();
ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
context.registerChannel("inputChannel", inputChannel);
context.registerChannel("outputChannel", outputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
@@ -218,53 +182,37 @@ public class MessagingAnnotationPostProcessorTests {
Object proxy = proxyFactory.getProxy();
postProcessor.postProcessAfterInitialization(proxy, "proxy");
context.refresh();
messageBus.start();
inputChannel.send(new StringMessage("world"));
Message<?> message = outputChannel.receive(1000);
assertEquals("hello world", message.getPayload());
messageBus.stop();
context.stop();
}
@Test
public void testMessageEndpointAnnotationInherited() {
GenericApplicationContext context = new GenericApplicationContext();
ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
context.registerChannel("inputChannel", inputChannel);
context.registerChannel("outputChannel", outputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointSubclass(), "subclass");
context.refresh();
messageBus.start();
inputChannel.send(new StringMessage("world"));
Message<?> message = outputChannel.receive(1000);
assertEquals("hello world", message.getPayload());
messageBus.stop();
context.stop();
}
@Test
public void testMessageEndpointAnnotationInheritedWithProxy() {
GenericApplicationContext context = new GenericApplicationContext();
ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
context.registerChannel("inputChannel", inputChannel);
context.registerChannel("outputChannel", outputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
@@ -272,79 +220,55 @@ public class MessagingAnnotationPostProcessorTests {
Object proxy = proxyFactory.getProxy();
postProcessor.postProcessAfterInitialization(proxy, "proxy");
context.refresh();
messageBus.start();
inputChannel.send(new StringMessage("world"));
Message<?> message = outputChannel.receive(1000);
assertEquals("hello world", message.getPayload());
messageBus.stop();
context.stop();
}
@Test
public void testMessageEndpointAnnotationInheritedFromInterface() {
GenericApplicationContext context = new GenericApplicationContext();
ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
context.registerChannel("inputChannel", inputChannel);
context.registerChannel("outputChannel", outputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl");
context.refresh();
messageBus.start();
inputChannel.send(new StringMessage("ABC"));
Message<?> message = outputChannel.receive(1000);
assertEquals("test-ABC", message.getPayload());
messageBus.stop();
context.stop();
}
@Test
public void testMessageEndpointAnnotationInheritedFromInterfaceWithAutoCreatedChannels() {
GenericApplicationContext context = new GenericApplicationContext();
ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
context.registerChannel("inputChannel", inputChannel);
context.registerChannel("outputChannel", outputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl");
context.refresh();
messageBus.start();
inputChannel.send(new StringMessage("ABC"));
Message<?> message = outputChannel.receive(1000);
assertEquals("test-ABC", message.getPayload());
messageBus.stop();
context.stop();
}
@Test
public void testMessageEndpointAnnotationInheritedFromInterfaceWithProxy() {
GenericApplicationContext context = new GenericApplicationContext();
ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
context.registerChannel("inputChannel", inputChannel);
context.registerChannel("outputChannel", outputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
@@ -352,23 +276,17 @@ public class MessagingAnnotationPostProcessorTests {
Object proxy = proxyFactory.getProxy();
postProcessor.postProcessAfterInitialization(proxy, "proxy");
context.refresh();
messageBus.start();
inputChannel.send(new StringMessage("ABC"));
Message<?> message = outputChannel.receive(1000);
assertEquals("test-ABC", message.getPayload());
messageBus.stop();
context.stop();
}
@Test
public void testEndpointWithPollerAnnotation() {
GenericApplicationContext context = new GenericApplicationContext();
ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel testChannel = new QueueChannel();
testChannel.setBeanName("testChannel");
context.getBeanFactory().registerSingleton("testChannel", testChannel);
messageBus.setApplicationContext(context);
context.registerChannel("testChannel", testChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
@@ -385,19 +303,12 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testChannelAdapterAnnotation() throws InterruptedException {
GenericApplicationContext context = new GenericApplicationContext();
ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
TestApplicationContext context = TestUtils.createTestApplicationContext();
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
ChannelAdapterAnnotationTestBean testBean = new ChannelAdapterAnnotationTestBean();
postProcessor.postProcessAfterInitialization(testBean, "testBean");
context.refresh();
messageBus.start();
ChannelResolver channelResolver = new BeanFactoryChannelResolver(context);
DirectChannel testChannel = (DirectChannel) channelResolver.resolveChannelName("testChannel");
final CountDownLatch latch = new CountDownLatch(1);
@@ -408,26 +319,21 @@ public class MessagingAnnotationPostProcessorTests {
latch.countDown();
}
});
context.refresh();
latch.await(3, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
assertNotNull(receivedMessage.get());
assertEquals("test", receivedMessage.get().getPayload());
messageBus.stop();
context.stop();
}
@Test
public void testTransformer() {
GenericApplicationContext context = new GenericApplicationContext();
TestApplicationContext context = TestUtils.createTestApplicationContext();
DirectChannel inputChannel = new DirectChannel();
inputChannel.setBeanName("inputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.registerChannel("inputChannel", inputChannel);
QueueChannel outputChannel = new QueueChannel();
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
context.registerChannel("outputChannel", outputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
@@ -437,6 +343,7 @@ public class MessagingAnnotationPostProcessorTests {
inputChannel.send(new StringMessage("foo"));
Message<?> reply = outputChannel.receive(0);
assertEquals("FOO", reply.getPayload());
context.stop();
}

View File

@@ -21,25 +21,21 @@ import static org.junit.Assert.assertEquals;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.bus.ApplicationContextMessageBus;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.xml.MessageBusParser;
import org.springframework.integration.core.Message;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.util.TestUtils;
import org.springframework.integration.util.TestUtils.TestApplicationContext;
/**
* @author Mark Fisher
*/
public class RouterAnnotationPostProcessorTests {
private GenericApplicationContext context = new GenericApplicationContext();
private ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
private TestApplicationContext context = TestUtils.createTestApplicationContext();
private DirectChannel inputChannel = new DirectChannel();
@@ -48,14 +44,8 @@ public class RouterAnnotationPostProcessorTests {
@Before
public void init() {
messageBus.setApplicationContext(context);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
inputChannel.setBeanName("input");
outputChannel.setBeanName("output");
context.getBeanFactory().registerSingleton("input", inputChannel);
context.getBeanFactory().registerSingleton("output", outputChannel);
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
context.registerChannel("input", inputChannel);
context.registerChannel("output", outputChannel);
}
@@ -67,10 +57,10 @@ public class RouterAnnotationPostProcessorTests {
TestRouter testRouter = new TestRouter();
postProcessor.postProcessAfterInitialization(testRouter, "test");
context.refresh();
messageBus.start();
inputChannel.send(new StringMessage("foo"));
Message<?> replyMessage = outputChannel.receive(0);
assertEquals("foo", replyMessage.getPayload());
context.stop();
}

View File

@@ -23,25 +23,21 @@ import static org.junit.Assert.assertNull;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Splitter;
import org.springframework.integration.bus.ApplicationContextMessageBus;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.xml.MessageBusParser;
import org.springframework.integration.core.Message;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.util.TestUtils;
import org.springframework.integration.util.TestUtils.TestApplicationContext;
/**
* @author Mark Fisher
*/
public class SplitterAnnotationPostProcessorTests {
private GenericApplicationContext context = new GenericApplicationContext();
private ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
private TestApplicationContext context = TestUtils.createTestApplicationContext();
private DirectChannel inputChannel = new DirectChannel();
@@ -50,14 +46,8 @@ public class SplitterAnnotationPostProcessorTests {
@Before
public void init() {
inputChannel.setBeanName("input");
outputChannel.setBeanName("output");
context.getBeanFactory().registerSingleton("input", inputChannel);
context.getBeanFactory().registerSingleton("output", outputChannel);
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
context.registerChannel("input", inputChannel);
context.registerChannel("output", outputChannel);
}
@@ -69,7 +59,6 @@ public class SplitterAnnotationPostProcessorTests {
TestSplitter splitter = new TestSplitter();
postProcessor.postProcessAfterInitialization(splitter, "testSplitter");
context.refresh();
messageBus.start();
inputChannel.send(new StringMessage("this.is.a.test"));
Message<?> message1 = outputChannel.receive(500);
assertNotNull(message1);
@@ -84,7 +73,7 @@ public class SplitterAnnotationPostProcessorTests {
assertNotNull(message4);
assertEquals("test", message4.getPayload());
assertNull(outputChannel.receive(0));
messageBus.stop();
context.stop();
}

View File

@@ -36,6 +36,7 @@ import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.core.MessageHeaders;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.util.TestUtils;
/**
* @author Iwein Fuld
@@ -59,6 +60,7 @@ public class SimpleMessagingGatewayTests {
this.simpleMessagingGateway = new SimpleMessagingGateway();
this.simpleMessagingGateway.setRequestChannel(requestChannel);
this.simpleMessagingGateway.setReplyChannel(replyChannel);
this.simpleMessagingGateway.setBeanFactory(TestUtils.createTestApplicationContext());
reset(allmocks);
}

View File

@@ -26,14 +26,13 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.bus.ApplicationContextMessageBus;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessagingException;
import org.springframework.integration.endpoint.PollingConsumer;
import org.springframework.integration.handler.MethodInvokingMessageHandler;
import org.springframework.integration.util.TestUtils;
import org.springframework.integration.util.TestUtils.TestApplicationContext;
/**
* @author Mark Fisher
@@ -75,27 +74,22 @@ public class MethodInvokingMessageHandlerTests {
@Test
public void subscription() throws Exception {
GenericApplicationContext context = new GenericApplicationContext();
TestApplicationContext context = TestUtils.createTestApplicationContext();
SynchronousQueue<String> queue = new SynchronousQueue<String>();
TestBean testBean = new TestBean(queue);
QueueChannel channel = new QueueChannel();
channel.setBeanName("channel");
context.getBeanFactory().registerSingleton("channel", channel);
context.registerChannel("channel", channel);
Message<String> message = new GenericMessage<String>("testing");
channel.send(message);
assertNull(queue.poll());
MethodInvokingMessageHandler handler = new MethodInvokingMessageHandler(testBean, "foo");
PollingConsumer endpoint = new PollingConsumer(channel, handler);
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
context.registerEndpoint("testEndpoint", endpoint);
context.refresh();
bus.start();
String result = queue.poll(1000, TimeUnit.MILLISECONDS);
assertNotNull(result);
assertEquals("testing", result);
bus.stop();
context.stop();
}

View File

@@ -19,6 +19,16 @@ package org.springframework.integration.util;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.FatalBeanException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.scheduling.SimpleTaskScheduler;
import org.springframework.integration.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -56,6 +66,12 @@ public abstract class TestUtils {
return (T) value;
}
public static TestApplicationContext createTestApplicationContext() {
TestApplicationContext context = new TestApplicationContext();
registerBean(IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME, createTaskScheduler(10), context);
return context;
}
public static TaskScheduler createTaskScheduler(int poolSize) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(poolSize);
@@ -64,4 +80,56 @@ public abstract class TestUtils {
return new SimpleTaskScheduler(executor);
}
private static void registerBean(String beanName, Object bean, BeanFactory beanFactory) {
Assert.notNull(beanName, "bean name must not be null");
ConfigurableListableBeanFactory configurableListableBeanFactory = null;
if (beanFactory instanceof ConfigurableListableBeanFactory) {
configurableListableBeanFactory = (ConfigurableListableBeanFactory) beanFactory;
}
else if (beanFactory instanceof GenericApplicationContext) {
configurableListableBeanFactory = ((GenericApplicationContext) beanFactory).getBeanFactory();
}
if (bean instanceof BeanNameAware) {
((BeanNameAware) bean).setBeanName(beanName);
}
if (bean instanceof BeanFactoryAware) {
((BeanFactoryAware) bean).setBeanFactory(beanFactory);
}
if (bean instanceof InitializingBean) {
try {
((InitializingBean) bean).afterPropertiesSet();
}
catch (Exception e) {
throw new FatalBeanException("failed to register bean with test context", e);
}
}
configurableListableBeanFactory.registerSingleton(beanName, bean);
}
public static class TestApplicationContext extends GenericApplicationContext {
private TestApplicationContext() {
super();
}
public void registerChannel(String channelName, MessageChannel channel) {
if (channel.getName() != null) {
if (channelName == null) {
Assert.notNull(channel.getName(), "channel name must not be null");
channelName = channel.getName();
}
else {
Assert.isTrue(channel.getName().equals(channelName),
"channel name has already been set with a conflicting value");
}
}
registerBean(channelName, channel, this);
}
public void registerEndpoint(String endpointName, AbstractEndpoint endpoint) {
registerBean(endpointName, endpoint, this);
}
}
}